You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/03/02 08:14:37 UTC

[camel-kafka-connector] 01/02: Add Netty source itest #1036

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch backport-netty-itests
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 4c215981a84ae8103f059b1b35d1482970d7a874
Author: Tadayoshi Sato <sa...@gmail.com>
AuthorDate: Thu Feb 25 23:17:26 2021 +0900

    Add Netty source itest #1036
---
 .../common/test/CamelSourceTestSupport.java        |  15 +++
 tests/itests-netty/pom.xml                         |  45 +++++++++
 .../netty/source/CamelNettyPropertyFactory.java    |  57 +++++++++++
 .../netty/source/CamelSourceNettyITCase.java       | 110 +++++++++++++++++++++
 tests/pom.xml                                      |   1 +
 5 files changed, 228 insertions(+)

diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
index 5bb6a93..016525a 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
@@ -88,6 +88,21 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest {
      * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results
      *
      * @param connectorPropertyFactory A factory for connector properties
+     * @param topic the topic to send the messages to
+     * @param count the number of messages to send
+     * @throws Exception For test-specific exceptions
+     */
+    public void runTestBlocking(ConnectorPropertyFactory connectorPropertyFactory, String topic, int count) throws ExecutionException, InterruptedException {
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+        StringMessageConsumer consumer = new StringMessageConsumer(kafkaClient, topic, count);
+
+        runTestBlocking(connectorPropertyFactory, consumer);
+    }
+
+    /**
+     * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results
+     *
+     * @param connectorPropertyFactory A factory for connector properties
      * @param consumer A Kafka consumer consumer for the test messages
      * @throws Exception For test-specific exceptions
      */
diff --git a/tests/itests-netty/pom.xml b/tests/itests-netty/pom.xml
new file mode 100644
index 0000000..1ef8735
--- /dev/null
+++ b/tests/itests-netty/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.camel.kafkaconnector</groupId>
+        <artifactId>itests-parent</artifactId>
+        <version>0.9.0-SNAPSHOT</version>
+        <relativePath>../itests-parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>itests-netty</artifactId>
+    <name>Camel-Kafka-Connector :: Tests :: Netty</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.kafkaconnector</groupId>
+            <artifactId>itests-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-netty</artifactId>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelNettyPropertyFactory.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelNettyPropertyFactory.java
new file mode 100644
index 0000000..d0f6132
--- /dev/null
+++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelNettyPropertyFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.netty.source;
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
+
+final class CamelNettyPropertyFactory extends SourceConnectorPropertyFactory<CamelNettyPropertyFactory> {
+
+    private CamelNettyPropertyFactory() {
+    }
+
+    public CamelNettyPropertyFactory withProtocol(String value) {
+        return setProperty("camel.source.path.protocol", value);
+    }
+
+    public CamelNettyPropertyFactory withHost(String value) {
+        return setProperty("camel.source.path.host", value);
+    }
+
+    public CamelNettyPropertyFactory withPort(int value) {
+        return setProperty("camel.source.path.port", value);
+    }
+
+    public CamelNettyPropertyFactory withSync(boolean value) {
+        return setProperty("camel.source.endpoint.sync", value);
+    }
+
+    public EndpointUrlBuilder<CamelNettyPropertyFactory> withUrl(String protocol, String host, int port) {
+        String url = String.format("netty:%s://%s:%s", protocol, host, port);
+        return new EndpointUrlBuilder<>(this::withSourceUrl, url);
+    }
+
+    public static CamelNettyPropertyFactory basic() {
+        return new CamelNettyPropertyFactory()
+                .withName("CamelNettySourceConnector")
+                .withTasksMax(1)
+                .withConnectorClass("org.apache.camel.kafkaconnector.netty.CamelNettySourceConnector")
+                .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+    }
+}
diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
new file mode 100644
index 0000000..6c76789
--- /dev/null
+++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.netty.source;
+
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
+import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class CamelSourceNettyITCase extends CamelSourceTestSupport {
+    private static final int PORT = NetworkUtils.getFreePort("localhost");
+
+    private final int expect = 1;
+    private String topicName;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-netty-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        topicName = getTopicForTest(this);
+    }
+
+    @Override
+    protected void produceTestData() {
+        try {
+            // TODO necessary to wait for ckc netty endpoint to be up and ready
+            Thread.sleep(3000);
+        } catch (Exception ignored) {
+        }
+        sendMessage();
+    }
+
+    void sendMessage() {
+        try (Socket s = new Socket("localhost", PORT);
+             PrintWriter out = new PrintWriter(s.getOutputStream())) {
+            out.print("Hello CKC!");
+            out.flush();
+        } catch (Exception e) {
+            fail(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
+        Object receivedObject = consumer.consumedMessages().get(0).value();
+        assertEquals(expect, received, "Did not receive as many messages as expected");
+        assertEquals("Hello CKC!", receivedObject, "Received message content differed");
+    }
+
+    @Test
+    @Timeout(30)
+    @Disabled("Camel-Netty-* connectors are not working #924")
+    public void testLaunchConnector() throws ExecutionException, InterruptedException {
+        CamelNettyPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory
+                .basic()
+                .withKafkaTopic(topicName)
+                .withProtocol("tcp")
+                .withHost("localhost")
+                .withPort(PORT)
+                // one-way as test client doesn't receive response
+                .withSync(false);
+
+        runTestBlocking(connectorPropertyFactory, topicName, expect);
+    }
+
+    @Test
+    @Timeout(30)
+    public void testLaunchConnectorUsingUrl() throws ExecutionException, InterruptedException {
+        CamelNettyPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory
+                .basic()
+                .withKafkaTopic(topicName)
+                .withUrl("tcp", "localhost", PORT)
+                // one-way as test client doesn't receive response
+                .append("sync", "false")
+                .buildUrl();
+
+        runTestBlocking(connectorPropertyFactory, topicName, expect);
+    }
+}
diff --git a/tests/pom.xml b/tests/pom.xml
index 26aa43f..b1ec328 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -62,6 +62,7 @@
         <module>itests-ssh</module>
         <module>itests-sql</module>
         <module>itests-cxf</module>
+        <module>itests-netty</module>
     </modules>