You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2021/02/17 17:08:17 UTC

[camel-quarkus] branch master updated: Add basic Netty UDP tests

This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git


The following commit(s) were added to refs/heads/master by this push:
     new 20b938e  Add basic Netty UDP tests
20b938e is described below

commit 20b938eef1aa8b3da9ee8e2d18ce956deb52b78d
Author: James Netherton <ja...@gmail.com>
AuthorDate: Wed Feb 17 15:07:32 2021 +0000

    Add basic Netty UDP tests
    
    Fixes #2187
---
 integration-tests/netty/pom.xml                    | 21 +++++-
 .../camel/quarkus/component/netty/CamelRoute.java  | 27 -------
 .../quarkus/component/netty/NettyCodecHelper.java  | 76 ++++++++++++++++++++
 .../quarkus/component/netty/NettyResource.java     | 83 ++++++++++++++++++++++
 .../camel/quarkus/component/netty/NettyRoutes.java | 52 ++++++++++++++
 .../camel/quarkus/component/netty/NettyTest.java   | 57 ++++++++++-----
 .../quarkus/component/netty/NettyTestResource.java |  7 +-
 7 files changed, 276 insertions(+), 47 deletions(-)

diff --git a/integration-tests/netty/pom.xml b/integration-tests/netty/pom.xml
index 22a6638..d553d2b 100644
--- a/integration-tests/netty/pom.xml
+++ b/integration-tests/netty/pom.xml
@@ -34,11 +34,13 @@
             <groupId>org.apache.camel.quarkus</groupId>
             <artifactId>camel-quarkus-netty</artifactId>
         </dependency>
-
-        <!-- include in build for integration testing support -->
+        <dependency>
+            <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-seda</artifactId>
+        </dependency>
         <dependency>
             <groupId>io.quarkus</groupId>
-            <artifactId>quarkus-undertow</artifactId>
+            <artifactId>quarkus-resteasy</artifactId>
         </dependency>
 
         <!-- test dependencies -->
@@ -74,6 +76,19 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-seda-deployment</artifactId>
+            <version>${project.version}</version>
+            <type>pom</type>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
 
 
diff --git a/integration-tests/netty/src/main/java/org/apache/camel/quarkus/component/netty/CamelRoute.java b/integration-tests/netty/src/main/java/org/apache/camel/quarkus/component/netty/CamelRoute.java
deleted file mode 100644
index b19ca3e..0000000
--- a/integration-tests/netty/src/main/java/org/apache/camel/quarkus/component/netty/CamelRoute.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.quarkus.component.netty;
-
-import org.apache.camel.builder.RouteBuilder;
-
-public class CamelRoute extends RouteBuilder {
-    @Override
-    public void configure() {
-        from("netty:tcp://0.0.0.0:{{camel.netty.test-port}}?textline=true&sync=true")
-                .setBody().constant("When You Go Home, Tell Them Of Us And Say, For Your Tomorrow, We Gave Our Today.");
-    }
-}
diff --git a/integration-tests/netty/src/main/java/org/apache/camel/quarkus/component/netty/NettyCodecHelper.java b/integration-tests/netty/src/main/java/org/apache/camel/quarkus/component/netty/NettyCodecHelper.java
new file mode 100644
index 0000000..ef5cc72
--- /dev/null
+++ b/integration-tests/netty/src/main/java/org/apache/camel/quarkus/component/netty/NettyCodecHelper.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.netty;
+
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.handler.codec.MessageToMessageEncoder;
+import org.apache.camel.component.netty.ChannelHandlerFactories;
+import org.apache.camel.component.netty.ChannelHandlerFactory;
+
+public final class NettyCodecHelper {
+
+    private NettyCodecHelper() {
+        // Utility class
+    }
+
+    public static ChannelHandlerFactory createNullDelimitedHandler(String protocol) {
+        ByteBuf delimiter = Unpooled.wrappedBuffer(new byte[] { 0 });
+        ByteBuf[] delimiters = new ByteBuf[] { delimiter, delimiter };
+        return ChannelHandlerFactories.newDelimiterBasedFrameDecoder(4096, delimiters, protocol);
+    }
+
+    public static BytesDecoder createBytesDecoder() {
+        return new BytesDecoder();
+    }
+
+    public static BytesEncoder createBytesEncoder() {
+        return new BytesEncoder();
+    }
+
+    @ChannelHandler.Sharable
+    static class BytesDecoder extends MessageToMessageDecoder<ByteBuf> {
+
+        @Override
+        protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
+            if (msg.isReadable()) {
+                byte[] bytes = new byte[msg.readableBytes()];
+                int readerIndex = msg.readerIndex();
+                msg.getBytes(readerIndex, bytes);
+                out.add(bytes);
+            }
+        }
+    }
+
+    @ChannelHandler.Sharable
+    static class BytesEncoder extends MessageToMessageEncoder<byte[]> {
+
+        @Override
+        protected void encode(ChannelHandlerContext ctx, byte[] msg, List<Object> out) throws Exception {
+            byte[] bytes = msg;
+            ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(bytes.length);
+            buf.writeBytes(bytes);
+            out.add(buf);
+        }
+    }
+}
diff --git a/integration-tests/netty/src/main/java/org/apache/camel/quarkus/component/netty/NettyResource.java b/integration-tests/netty/src/main/java/org/apache/camel/quarkus/component/netty/NettyResource.java
new file mode 100644
index 0000000..25104c7
--- /dev/null
+++ b/integration-tests/netty/src/main/java/org/apache/camel/quarkus/component/netty/NettyResource.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.netty;
+
+import java.nio.charset.StandardCharsets;
+
+import javax.inject.Inject;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+
+import org.apache.camel.ConsumerTemplate;
+import org.apache.camel.ProducerTemplate;
+
+@Path("/netty")
+public class NettyResource {
+
+    @Inject
+    ProducerTemplate producerTemplate;
+
+    @Inject
+    ConsumerTemplate consumerTemplate;
+
+    @Path("/tcp")
+    @POST
+    public String sendNettyTcpMessage(String message) {
+        return producerTemplate.requestBody("netty:tcp://localhost:{{camel.netty.test-tcp-port}}?textline=true&sync=true",
+                message, String.class);
+    }
+
+    @Path("/udp")
+    @POST
+    public String sendNettyUdpMessage(String message) {
+        return producerTemplate.requestBody("netty:udp://localhost:{{camel.netty.test-udp-port}}?sync=true", message,
+                String.class);
+    }
+
+    @Path("/tcp/codec")
+    @POST
+    public Object sendNettyTcpMessageWithCodec(String message) {
+        producerTemplate.sendBody("netty:tcp://localhost:{{camel.netty.test-codec-tcp-port}}?disconnect=true"
+                + "&sync=false&allowDefaultCodec=false"
+                + "&decoders=#tcpNullDelimitedHandler,#bytesDecoder"
+                + "&encoders=#bytesEncoder", createNullDelimitedMessage(message));
+
+        return consumerTemplate.receiveBody("seda:custom-tcp-codec", 5000, String.class);
+    }
+
+    @Path("/udp/codec")
+    @POST
+    public Object sendNettyUdpMessageWithCodec(String message) {
+        producerTemplate.sendBody("netty:udp://localhost:{{camel.netty.test-codec-udp-port}}?sync=false"
+                + "&udpByteArrayCodec=true", message.getBytes(StandardCharsets.UTF_8));
+
+        return consumerTemplate.receiveBody("seda:custom-udp-codec", 5000, String.class);
+    }
+
+    private byte[] createNullDelimitedMessage(String message) {
+        byte[] messageBytes = message.getBytes(StandardCharsets.UTF_8);
+        byte[] bytes = new byte[messageBytes.length + 2];
+        bytes[message.length() - 1] = 0;
+        bytes[message.length() - 2] = 0;
+
+        for (int i = 0; i < messageBytes.length; i++) {
+            bytes[i] = messageBytes[i];
+        }
+
+        return bytes;
+    }
+}
diff --git a/integration-tests/netty/src/main/java/org/apache/camel/quarkus/component/netty/NettyRoutes.java b/integration-tests/netty/src/main/java/org/apache/camel/quarkus/component/netty/NettyRoutes.java
new file mode 100644
index 0000000..117d5f6
--- /dev/null
+++ b/integration-tests/netty/src/main/java/org/apache/camel/quarkus/component/netty/NettyRoutes.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.netty;
+
+import io.netty.channel.ChannelHandler;
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.netty.ChannelHandlerFactory;
+
+public class NettyRoutes extends RouteBuilder {
+
+    @BindToRegistry("tcpNullDelimitedHandler")
+    private ChannelHandlerFactory tcpNullDelimitedHandler = NettyCodecHelper.createNullDelimitedHandler("tcp");
+
+    @BindToRegistry("bytesDecoder")
+    private ChannelHandler bytesDecoder = NettyCodecHelper.createBytesDecoder();
+
+    @BindToRegistry("bytesEncoder")
+    private ChannelHandler bytesEncoder = NettyCodecHelper.createBytesEncoder();
+
+    @Override
+    public void configure() {
+        from("netty:tcp://localhost:{{camel.netty.test-tcp-port}}?textline=true&sync=true")
+                .transform().simple("Hello ${body} TCP");
+
+        from("netty:udp://localhost:{{camel.netty.test-udp-port}}?sync=true")
+                .transform().simple("Hello ${body} UDP");
+
+        from("netty:tcp://localhost:{{camel.netty.test-codec-tcp-port}}?disconnect=true&sync=false&allowDefaultCodec=false&decoders=#tcpNullDelimitedHandler,#bytesDecoder&encoders=#bytesEncoder")
+                .convertBodyTo(String.class)
+                .transform().simple("Hello ${body} TCP")
+                .to("seda:custom-tcp-codec");
+
+        from("netty:udp://localhost:{{camel.netty.test-codec-udp-port}}?udpByteArrayCodec=true&sync=false")
+                .transform().simple("Hello ${body} UDP")
+                .to("seda:custom-udp-codec");
+    }
+}
diff --git a/integration-tests/netty/src/test/java/org/apache/camel/quarkus/component/netty/NettyTest.java b/integration-tests/netty/src/test/java/org/apache/camel/quarkus/component/netty/NettyTest.java
index 850c98e..d49fdc4 100644
--- a/integration-tests/netty/src/test/java/org/apache/camel/quarkus/component/netty/NettyTest.java
+++ b/integration-tests/netty/src/test/java/org/apache/camel/quarkus/component/netty/NettyTest.java
@@ -16,35 +16,60 @@
  */
 package org.apache.camel.quarkus.component.netty;
 
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
-import java.net.Socket;
 
 import io.quarkus.test.common.QuarkusTestResource;
 import io.quarkus.test.junit.QuarkusTest;
-import org.junit.jupiter.api.Assertions;
+import io.restassured.RestAssured;
 import org.junit.jupiter.api.Test;
 
+import static org.hamcrest.Matchers.is;
+
 @QuarkusTest
 @QuarkusTestResource(NettyTestResource.class)
 class NettyTest {
-    private static final String POEM = "Epitaph in Kohima, India marking the WWII Battle of Kohima and Imphal, Burma Campaign - Attributed to John Maxwell Edmonds";
-    private static final String EXPECTED_RESPONSE = "When You Go Home, Tell Them Of Us And Say, For Your Tomorrow, We Gave Our Today.";
 
     @Test
-    public void testPoem() throws IOException {
+    public void testNettyTcpProduceConsume() throws IOException {
+        RestAssured.given()
+                .body("Camel Quarkus Netty")
+                .post("/netty/tcp")
+                .then()
+                .statusCode(200)
+                .body(is("Hello Camel Quarkus Netty TCP"));
+
+    }
 
-        try (
-                final Socket socket = new Socket("localhost", Integer.getInteger("camel.netty.test-port"));
-                final PrintWriter outputWriter = new PrintWriter(socket.getOutputStream(), true);
-                final BufferedReader inputReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));) {
-            outputWriter.println(POEM);
-            String response = inputReader.readLine();
-            Assertions.assertTrue(response.equalsIgnoreCase(EXPECTED_RESPONSE), "Response did not match expected response");
-        }
+    @Test
+    public void testNettyTcpProduceConsumeWithCodec() throws IOException {
+        String message = "Camel Quarkus Netty Custom Codec";
+        RestAssured.given()
+                .body(message)
+                .post("/netty/tcp/codec")
+                .then()
+                .statusCode(200)
+                .body(is("Hello Camel Quarkus Netty Custom Codec TCP"));
+    }
 
+    @Test
+    public void testNettyUdpProduceConsumeWithCodec() throws IOException {
+        String message = "Camel Quarkus Netty Custom Codec";
+        RestAssured.given()
+                .body(message)
+                .post("/netty/udp/codec")
+                .then()
+                .statusCode(200)
+                .body(is("Hello Camel Quarkus Netty Custom Codec UDP"));
+    }
+
+    @Test
+    public void testNettyUdpProduceConsume() throws IOException {
+        RestAssured.given()
+                .body("Camel Quarkus Netty")
+                .post("/netty/udp")
+                .then()
+                .statusCode(200)
+                .body(is("Hello Camel Quarkus Netty UDP"));
     }
 
 }
diff --git a/integration-tests/netty/src/test/java/org/apache/camel/quarkus/component/netty/NettyTestResource.java b/integration-tests/netty/src/test/java/org/apache/camel/quarkus/component/netty/NettyTestResource.java
index 79e8452..7bff80b 100644
--- a/integration-tests/netty/src/test/java/org/apache/camel/quarkus/component/netty/NettyTestResource.java
+++ b/integration-tests/netty/src/test/java/org/apache/camel/quarkus/component/netty/NettyTestResource.java
@@ -25,7 +25,12 @@ import org.apache.camel.quarkus.test.AvailablePortFinder;
 public class NettyTestResource implements QuarkusTestResourceLifecycleManager {
     @Override
     public Map<String, String> start() {
-        return AvailablePortFinder.reserveNetworkPorts(Objects::toString, "camel.netty.test-port");
+        return AvailablePortFinder.reserveNetworkPorts(
+                Objects::toString,
+                "camel.netty.test-tcp-port",
+                "camel.netty.test-codec-tcp-port",
+                "camel.netty.test-udp-port",
+                "camel.netty.test-codec-udp-port");
     }
 
     @Override