You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/03/02 08:14:36 UTC

[camel-kafka-connector] branch backport-netty-itests created (now e740f91)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch backport-netty-itests
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


      at e740f91  Add netty sink itest #1036

This branch includes the following new commits:

     new 4c21598  Add Netty source itest #1036
     new e740f91  Add netty sink itest #1036

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel-kafka-connector] 02/02: Add netty sink itest #1036

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch backport-netty-itests
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit e740f911f0bd8bbd5adf244073c2a89e462935bc
Author: Tadayoshi Sato <sa...@gmail.com>
AuthorDate: Tue Mar 2 11:53:49 2021 +0900

    Add netty sink itest #1036
---
 .../netty/sink/CamelNettyPropertyFactory.java      |  61 +++++++++++
 .../netty/sink/CamelSinkNettyITCase.java           | 119 +++++++++++++++++++++
 .../netty/source/CamelSourceNettyITCase.java       |   5 +-
 3 files changed, 182 insertions(+), 3 deletions(-)

diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelNettyPropertyFactory.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelNettyPropertyFactory.java
new file mode 100644
index 0000000..31b5343
--- /dev/null
+++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelNettyPropertyFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.netty.sink;
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+
+final class CamelNettyPropertyFactory extends SinkConnectorPropertyFactory<CamelNettyPropertyFactory> {
+
+    private CamelNettyPropertyFactory() {
+    }
+
+    public CamelNettyPropertyFactory withProtocol(String value) {
+        return setProperty("camel.sink.path.protocol", value);
+    }
+
+    public CamelNettyPropertyFactory withHost(String value) {
+        return setProperty("camel.sink.path.host", value);
+    }
+
+    public CamelNettyPropertyFactory withPort(int value) {
+        return setProperty("camel.sink.path.port", value);
+    }
+
+    public CamelNettyPropertyFactory withDisconnect(boolean value) {
+        return setProperty("camel.sink.endpoint.disconnect", value);
+    }
+
+    public CamelNettyPropertyFactory withSync(boolean value) {
+        return setProperty("camel.sink.endpoint.sync", value);
+    }
+
+    public EndpointUrlBuilder<CamelNettyPropertyFactory> withUrl(String protocol, String host, int port) {
+        String url = String.format("netty:%s://%s:%s", protocol, host, port);
+        return new EndpointUrlBuilder<>(this::withSinkUrl, url);
+    }
+
+    public static CamelNettyPropertyFactory basic() {
+        return new CamelNettyPropertyFactory()
+                .withName("CamelNettySinkConnector")
+                .withTasksMax(1)
+                .withConnectorClass("org.apache.camel.kafkaconnector.netty.CamelNettySinkConnector")
+                .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+    }
+}
diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelSinkNettyITCase.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelSinkNettyITCase.java
new file mode 100644
index 0000000..bb08243
--- /dev/null
+++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelSinkNettyITCase.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.netty.sink;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class CamelSinkNettyITCase extends CamelSinkTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkNettyITCase.class);
+    private static final int PORT = NetworkUtils.getFreePort("localhost");
+
+    private String topicName;
+
+    private final int expect = 1;
+    private volatile String received;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-netty-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        topicName = getTopicForTest(this);
+        received = null;
+    }
+
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try (ServerSocket serverSocket = new ServerSocket(PORT);
+             Socket socket = serverSocket.accept();
+             InputStream is = socket.getInputStream();
+             BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {
+            received = reader.readLine();
+            LOG.debug("Received: {}", received);
+        } catch (IOException e) {
+            LOG.error("Unable to receive messages: {}", e.getMessage(), e);
+        } finally {
+            latch.countDown();
+        }
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        String expected = "Sink test message 0";
+        if (latch.await(30, TimeUnit.SECONDS)) {
+            assertEquals(expected, received, "Received message content differed");
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
+    }
+
+    @Test
+    @Timeout(30)
+    public void testBasicSendReceive() throws Exception {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory.basic()
+                .withTopics(topicName)
+                .withProtocol("tcp")
+                // TODO https://github.com/apache/camel-kafka-connector/issues/924
+                .withHost("//localhost")
+                .withPort(PORT)
+                // disconnect so that it won't keep mock server socket forever
+                .withDisconnect(true)
+                // one-way as mock server doesn't send replies
+                .withSync(false);
+
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
+
+    @Test
+    @Timeout(30)
+    public void testBasicSendReceiveUsingUrl() throws Exception {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory.basic()
+                .withTopics(topicName)
+                .withUrl("tcp", "localhost", PORT)
+                // disconnect so that it won't keep mock server socket forever
+                .append("disconnect", "true")
+                // one-way as mock server doesn't send replies
+                .append("sync", "false")
+                .buildUrl();
+
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
+}
diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
index 6c76789..5384e22 100644
--- a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
+++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
@@ -25,7 +25,6 @@ import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
 import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
@@ -80,13 +79,13 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport {
 
     @Test
     @Timeout(30)
-    @Disabled("Camel-Netty-* connectors are not working #924")
     public void testLaunchConnector() throws ExecutionException, InterruptedException {
         CamelNettyPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory
                 .basic()
                 .withKafkaTopic(topicName)
                 .withProtocol("tcp")
-                .withHost("localhost")
+                // TODO https://github.com/apache/camel-kafka-connector/issues/924
+                .withHost("//localhost")
                 .withPort(PORT)
                 // one-way as test client doesn't receive response
                 .withSync(false);


[camel-kafka-connector] 01/02: Add Netty source itest #1036

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch backport-netty-itests
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 4c215981a84ae8103f059b1b35d1482970d7a874
Author: Tadayoshi Sato <sa...@gmail.com>
AuthorDate: Thu Feb 25 23:17:26 2021 +0900

    Add Netty source itest #1036
---
 .../common/test/CamelSourceTestSupport.java        |  15 +++
 tests/itests-netty/pom.xml                         |  45 +++++++++
 .../netty/source/CamelNettyPropertyFactory.java    |  57 +++++++++++
 .../netty/source/CamelSourceNettyITCase.java       | 110 +++++++++++++++++++++
 tests/pom.xml                                      |   1 +
 5 files changed, 228 insertions(+)

diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
index 5bb6a93..016525a 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
@@ -88,6 +88,21 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest {
      * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results
      *
      * @param connectorPropertyFactory A factory for connector properties
+     * @param topic the topic to send the messages to
+     * @param count the number of messages to send
+     * @throws Exception For test-specific exceptions
+     */
+    public void runTestBlocking(ConnectorPropertyFactory connectorPropertyFactory, String topic, int count) throws ExecutionException, InterruptedException {
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+        StringMessageConsumer consumer = new StringMessageConsumer(kafkaClient, topic, count);
+
+        runTestBlocking(connectorPropertyFactory, consumer);
+    }
+
+    /**
+     * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results
+     *
+     * @param connectorPropertyFactory A factory for connector properties
      * @param consumer A Kafka consumer consumer for the test messages
      * @throws Exception For test-specific exceptions
      */
diff --git a/tests/itests-netty/pom.xml b/tests/itests-netty/pom.xml
new file mode 100644
index 0000000..1ef8735
--- /dev/null
+++ b/tests/itests-netty/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.camel.kafkaconnector</groupId>
+        <artifactId>itests-parent</artifactId>
+        <version>0.9.0-SNAPSHOT</version>
+        <relativePath>../itests-parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>itests-netty</artifactId>
+    <name>Camel-Kafka-Connector :: Tests :: Netty</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.kafkaconnector</groupId>
+            <artifactId>itests-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-netty</artifactId>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelNettyPropertyFactory.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelNettyPropertyFactory.java
new file mode 100644
index 0000000..d0f6132
--- /dev/null
+++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelNettyPropertyFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.netty.source;
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
+
+final class CamelNettyPropertyFactory extends SourceConnectorPropertyFactory<CamelNettyPropertyFactory> {
+
+    private CamelNettyPropertyFactory() {
+    }
+
+    public CamelNettyPropertyFactory withProtocol(String value) {
+        return setProperty("camel.source.path.protocol", value);
+    }
+
+    public CamelNettyPropertyFactory withHost(String value) {
+        return setProperty("camel.source.path.host", value);
+    }
+
+    public CamelNettyPropertyFactory withPort(int value) {
+        return setProperty("camel.source.path.port", value);
+    }
+
+    public CamelNettyPropertyFactory withSync(boolean value) {
+        return setProperty("camel.source.endpoint.sync", value);
+    }
+
+    public EndpointUrlBuilder<CamelNettyPropertyFactory> withUrl(String protocol, String host, int port) {
+        String url = String.format("netty:%s://%s:%s", protocol, host, port);
+        return new EndpointUrlBuilder<>(this::withSourceUrl, url);
+    }
+
+    public static CamelNettyPropertyFactory basic() {
+        return new CamelNettyPropertyFactory()
+                .withName("CamelNettySourceConnector")
+                .withTasksMax(1)
+                .withConnectorClass("org.apache.camel.kafkaconnector.netty.CamelNettySourceConnector")
+                .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+    }
+}
diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
new file mode 100644
index 0000000..6c76789
--- /dev/null
+++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.netty.source;
+
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
+import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class CamelSourceNettyITCase extends CamelSourceTestSupport {
+    private static final int PORT = NetworkUtils.getFreePort("localhost");
+
+    private final int expect = 1;
+    private String topicName;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-netty-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        topicName = getTopicForTest(this);
+    }
+
+    @Override
+    protected void produceTestData() {
+        try {
+            // TODO necessary to wait for ckc netty endpoint to be up and ready
+            Thread.sleep(3000);
+        } catch (Exception ignored) {
+        }
+        sendMessage();
+    }
+
+    void sendMessage() {
+        try (Socket s = new Socket("localhost", PORT);
+             PrintWriter out = new PrintWriter(s.getOutputStream())) {
+            out.print("Hello CKC!");
+            out.flush();
+        } catch (Exception e) {
+            fail(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
+        Object receivedObject = consumer.consumedMessages().get(0).value();
+        assertEquals(expect, received, "Did not receive as many messages as expected");
+        assertEquals("Hello CKC!", receivedObject, "Received message content differed");
+    }
+
+    @Test
+    @Timeout(30)
+    @Disabled("Camel-Netty-* connectors are not working #924")
+    public void testLaunchConnector() throws ExecutionException, InterruptedException {
+        CamelNettyPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory
+                .basic()
+                .withKafkaTopic(topicName)
+                .withProtocol("tcp")
+                .withHost("localhost")
+                .withPort(PORT)
+                // one-way as test client doesn't receive response
+                .withSync(false);
+
+        runTestBlocking(connectorPropertyFactory, topicName, expect);
+    }
+
+    @Test
+    @Timeout(30)
+    public void testLaunchConnectorUsingUrl() throws ExecutionException, InterruptedException {
+        CamelNettyPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory
+                .basic()
+                .withKafkaTopic(topicName)
+                .withUrl("tcp", "localhost", PORT)
+                // one-way as test client doesn't receive response
+                .append("sync", "false")
+                .buildUrl();
+
+        runTestBlocking(connectorPropertyFactory, topicName, expect);
+    }
+}
diff --git a/tests/pom.xml b/tests/pom.xml
index 26aa43f..b1ec328 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -62,6 +62,7 @@
         <module>itests-ssh</module>
         <module>itests-sql</module>
         <module>itests-cxf</module>
+        <module>itests-netty</module>
     </modules>