You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/04/13 14:54:30 UTC

[storm] branch master updated: [STORM-1293] port storm.messaging.netty-integration-test to java (#3233)

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a382d4  [STORM-1293] port storm.messaging.netty-integration-test to java (#3233)
1a382d4 is described below

commit 1a382d477b550723984e46acf24dd957fd043aa3
Author: nd368 <na...@hotmail.co.uk>
AuthorDate: Mon Apr 13 15:54:13 2020 +0100

    [STORM-1293] port storm.messaging.netty-integration-test to java (#3233)
---
 .../storm/messaging/netty_integration_test.clj     | 66 ---------------
 .../storm/messaging/NettyIntegrationTest.java      | 98 ++++++++++++++++++++++
 2 files changed, 98 insertions(+), 66 deletions(-)

diff --git a/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj b/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
deleted file mode 100644
index 8e813af..0000000
--- a/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
+++ /dev/null
@@ -1,66 +0,0 @@
-
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.messaging.netty-integration-test
-  (:use [clojure test])
-  (:import [org.apache.storm.messaging TransportFactory]
-           [org.apache.storm Thrift Testing LocalCluster$Builder])
-  (:import [org.apache.storm.testing CompleteTopologyParam MockedSources TestWordSpout TestGlobalCount])
-  (:import [org.apache.storm.utils Utils])
-  (:use [org.apache.storm util config]))
-
-(deftest test-integration
-  (with-open [cluster (.build (doto (LocalCluster$Builder.)
-                                (.withSimulatedTime)
-                                (.withSupervisors 4)
-                                (.withSupervisorSlotPortMin 6710)
-                                (.withDaemonConf {STORM-LOCAL-MODE-ZMQ true 
-                                                    STORM-MESSAGING-TRANSPORT  "org.apache.storm.messaging.netty.Context"
-                                                    STORM-MESSAGING-NETTY-AUTHENTICATION false
-                                                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
-                                                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
-                                                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
-                                                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
-                                                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1})))]
-    (let [topology (Thrift/buildTopology
-                     {"1" (Thrift/prepareSpoutDetails
-                            (TestWordSpout. true) (Integer. 4))}
-                     {"2" (Thrift/prepareBoltDetails
-                            {(Utils/getGlobalStreamId "1" nil)
-                             (Thrift/prepareShuffleGrouping)}
-                            (TestGlobalCount.) (Integer. 6))})
-          results (Testing/completeTopology cluster
-                                     topology
-                                     (doto (CompleteTopologyParam.)
-                                       ;; important for test that
-                                       ;; #tuples = multiple of 4 and 6
-                                       (.setStormConf {TOPOLOGY-WORKERS 3})
-                                       (.setMockedSources
-                                         (MockedSources. {"1" [["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ]}
-                                     ))))]
-        (is (= (* 6 4) (.size (Testing/readTuples results "2")))))))
diff --git a/storm-core/test/jvm/org/apache/storm/messaging/NettyIntegrationTest.java b/storm-core/test/jvm/org/apache/storm/messaging/NettyIntegrationTest.java
new file mode 100644
index 0000000..ddb8663
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/messaging/NettyIntegrationTest.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.messaging;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.Builder;
+import org.apache.storm.Testing;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.testing.CompleteTopologyParam;
+import org.apache.storm.testing.FixedTuple;
+import org.apache.storm.testing.IntegrationTest;
+import org.apache.storm.testing.MockedSources;
+import org.apache.storm.testing.TestGlobalCount;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Values;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertEquals;
+
+@IntegrationTest
+public class NettyIntegrationTest {
+
+    @Test
+    public void testIntegration() throws Exception {
+        Map<String, Object> daemonConf = new HashMap<>();
+        daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, true);
+        daemonConf.put(Config.STORM_MESSAGING_TRANSPORT, "org.apache.storm.messaging.netty.Context");
+        daemonConf.put(Config.STORM_MESSAGING_NETTY_AUTHENTICATION, false);
+        daemonConf.put(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE, 1024000);
+        daemonConf.put(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS, 1000);
+        daemonConf.put(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS, 5000);
+        daemonConf.put(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS, 1);
+        daemonConf.put(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS, 1);
+
+        Builder builder = new Builder()
+                .withSimulatedTime()
+                .withSupervisors(4)
+                .withSupervisorSlotPortMin(6710)
+                .withDaemonConf(daemonConf);
+
+        try (LocalCluster cluster = builder.build()) {
+
+            TopologyBuilder topologyBuilder = new TopologyBuilder();
+            topologyBuilder.setSpout("1", new TestWordSpout(true), 4);
+            topologyBuilder.setBolt("2", new TestGlobalCount(), 6).shuffleGrouping("1");
+            StormTopology topology = topologyBuilder.createTopology();
+
+            // important for test that tuples = multiple of 4 and 6
+            List<FixedTuple> testTuples = Stream.of("a", "b",
+                                                    "a", "b",
+                                                    "a", "b",
+                                                    "a", "b",
+                                                    "a", "b",
+                                                    "a", "b",
+                                                    "a", "b",
+                                                    "a", "b",
+                                                    "a", "b",
+                                                    "a", "b",
+                                                    "a", "b",
+                                                    "a", "b")
+                    .map(value -> new FixedTuple(new Values(value)))
+                    .collect(Collectors.toList());
+
+            MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", testTuples));
+
+            CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
+            completeTopologyParams.setStormConf(Collections.singletonMap(Config.TOPOLOGY_WORKERS, 3));
+            completeTopologyParams.setMockedSources(mockedSources);
+
+            Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topology, completeTopologyParams);
+
+            assertEquals(6 * 4, Testing.readTuples(results, "2").size());
+        }
+    }
+}