You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/03/03 16:58:03 UTC

[camel-kafka-connector] branch camel-master updated (fabc832 -> 304215e)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


    from fabc832  Re-enables the basic test for couchbase since it was fixed on CAMEL-16284
     new b253e13  Add Netty source itest #1036
     new 5bc30a0  Add netty sink itest #1036
     new 0ee8710  Remove duplicate run test method
     new 758bf1e  Fix Netty tests reliability
     new 3d96ebc  Remove duplicate slashes (//) to comply with recent netty URI changes on Camel (ref: CAMEL-16280)
     new f8df2de  Replace the sleep w/ the reusable test utilities for waiting for the port to be open
     new 304215e  Remove duplicate slashes (//) on Netty tests

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../common/test/CamelSourceTestSupport.java        |  31 +++---
 tests/{itests-timer => itests-netty}/pom.xml       |   8 +-
 .../netty/sink/CamelNettyPropertyFactory.java}     |  38 ++++---
 .../netty/sink/CamelSinkNettyITCase.java}          | 110 ++++++++-------------
 .../netty/source/CamelNettyPropertyFactory.java}   |  43 ++++----
 .../netty/source/CamelSourceNettyITCase.java}      |  65 +++++++-----
 .../syslog/sink/CamelSyslogPropertyFactory.java    |   2 +-
 .../syslog/source/CamelSyslogPropertyFactory.java  |   2 +-
 tests/pom.xml                                      |   1 +
 9 files changed, 151 insertions(+), 149 deletions(-)
 copy tests/{itests-timer => itests-netty}/pom.xml (91%)
 copy tests/{itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFPropertyFactory.java => itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelNettyPropertyFactory.java} (52%)
 copy tests/{itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java => itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelSinkNettyITCase.java} (50%)
 copy tests/{itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelJMSPropertyFactory.java => itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelNettyPropertyFactory.java} (55%)
 copy tests/{itests-timer/src/test/java/org/apache/camel/kafkaconnector/timer/source/CamelSourceTimerITCase.java => itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java} (50%)


[camel-kafka-connector] 07/07: Remove duplicate slashes (//) on Netty tests

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 304215e6f462b2632b067e4d26ac5af3359e510a
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Mar 3 16:35:41 2021 +0100

    Remove duplicate slashes (//) on Netty tests
    
    They were added to work-around CAMEL-16280
---
 .../apache/camel/kafkaconnector/netty/sink/CamelSinkNettyITCase.java    | 2 +-
 .../camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java       | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelSinkNettyITCase.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelSinkNettyITCase.java
index 20b6814..19cc77c 100644
--- a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelSinkNettyITCase.java
+++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelSinkNettyITCase.java
@@ -90,7 +90,7 @@ public class CamelSinkNettyITCase extends CamelSinkTestSupport {
                 .withTopics(topicName)
                 .withProtocol("tcp")
                 // TODO https://github.com/apache/camel-kafka-connector/issues/924
-                .withHost("//" + NetworkUtils.getHostname())
+                .withHost(NetworkUtils.getHostname())
                 .withPort(port)
                 // disconnect so that it won't keep mock server socket forever
                 .withDisconnect(true)
diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
index 2d76e27..30d9926 100644
--- a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
+++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
@@ -81,7 +81,7 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport {
                 .withKafkaTopic(topicName)
                 .withProtocol("tcp")
                 // TODO https://github.com/apache/camel-kafka-connector/issues/924
-                .withHost("//" + NetworkUtils.getHostname())
+                .withHost(NetworkUtils.getHostname())
                 .withPort(port)
                 // one-way as test client doesn't receive response
                 .withSync(false);


[camel-kafka-connector] 02/07: Add netty sink itest #1036

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 5bc30a032b0def5490f3f7859a1062f7e681df6f
Author: Tadayoshi Sato <sa...@gmail.com>
AuthorDate: Tue Mar 2 11:53:49 2021 +0900

    Add netty sink itest #1036
---
 .../netty/sink/CamelNettyPropertyFactory.java      |  61 +++++++++++
 .../netty/sink/CamelSinkNettyITCase.java           | 119 +++++++++++++++++++++
 .../netty/source/CamelSourceNettyITCase.java       |   5 +-
 3 files changed, 182 insertions(+), 3 deletions(-)

diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelNettyPropertyFactory.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelNettyPropertyFactory.java
new file mode 100644
index 0000000..31b5343
--- /dev/null
+++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelNettyPropertyFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.netty.sink;
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+
+final class CamelNettyPropertyFactory extends SinkConnectorPropertyFactory<CamelNettyPropertyFactory> {
+
+    private CamelNettyPropertyFactory() {
+    }
+
+    public CamelNettyPropertyFactory withProtocol(String value) {
+        return setProperty("camel.sink.path.protocol", value);
+    }
+
+    public CamelNettyPropertyFactory withHost(String value) {
+        return setProperty("camel.sink.path.host", value);
+    }
+
+    public CamelNettyPropertyFactory withPort(int value) {
+        return setProperty("camel.sink.path.port", value);
+    }
+
+    public CamelNettyPropertyFactory withDisconnect(boolean value) {
+        return setProperty("camel.sink.endpoint.disconnect", value);
+    }
+
+    public CamelNettyPropertyFactory withSync(boolean value) {
+        return setProperty("camel.sink.endpoint.sync", value);
+    }
+
+    public EndpointUrlBuilder<CamelNettyPropertyFactory> withUrl(String protocol, String host, int port) {
+        String url = String.format("netty:%s://%s:%s", protocol, host, port);
+        return new EndpointUrlBuilder<>(this::withSinkUrl, url);
+    }
+
+    public static CamelNettyPropertyFactory basic() {
+        return new CamelNettyPropertyFactory()
+                .withName("CamelNettySinkConnector")
+                .withTasksMax(1)
+                .withConnectorClass("org.apache.camel.kafkaconnector.netty.CamelNettySinkConnector")
+                .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+    }
+}
diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelSinkNettyITCase.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelSinkNettyITCase.java
new file mode 100644
index 0000000..bb08243
--- /dev/null
+++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelSinkNettyITCase.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.netty.sink;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class CamelSinkNettyITCase extends CamelSinkTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkNettyITCase.class);
+    private static final int PORT = NetworkUtils.getFreePort("localhost");
+
+    private String topicName;
+
+    private final int expect = 1;
+    private volatile String received;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-netty-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        topicName = getTopicForTest(this);
+        received = null;
+    }
+
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try (ServerSocket serverSocket = new ServerSocket(PORT);
+             Socket socket = serverSocket.accept();
+             InputStream is = socket.getInputStream();
+             BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {
+            received = reader.readLine();
+            LOG.debug("Received: {}", received);
+        } catch (IOException e) {
+            LOG.error("Unable to receive messages: {}", e.getMessage(), e);
+        } finally {
+            latch.countDown();
+        }
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        String expected = "Sink test message 0";
+        if (latch.await(30, TimeUnit.SECONDS)) {
+            assertEquals(expected, received, "Received message content differed");
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
+    }
+
+    @Test
+    @Timeout(30)
+    public void testBasicSendReceive() throws Exception {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory.basic()
+                .withTopics(topicName)
+                .withProtocol("tcp")
+                // TODO https://github.com/apache/camel-kafka-connector/issues/924
+                .withHost("//localhost")
+                .withPort(PORT)
+                // disconnect so that it won't keep mock server socket forever
+                .withDisconnect(true)
+                // one-way as mock server doesn't send replies
+                .withSync(false);
+
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
+
+    @Test
+    @Timeout(30)
+    public void testBasicSendReceiveUsingUrl() throws Exception {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory.basic()
+                .withTopics(topicName)
+                .withUrl("tcp", "localhost", PORT)
+                // disconnect so that it won't keep mock server socket forever
+                .append("disconnect", "true")
+                // one-way as mock server doesn't send replies
+                .append("sync", "false")
+                .buildUrl();
+
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
+}
diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
index 6c76789..5384e22 100644
--- a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
+++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
@@ -25,7 +25,6 @@ import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
 import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
@@ -80,13 +79,13 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport {
 
     @Test
     @Timeout(30)
-    @Disabled("Camel-Netty-* connectors are not working #924")
     public void testLaunchConnector() throws ExecutionException, InterruptedException {
         CamelNettyPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory
                 .basic()
                 .withKafkaTopic(topicName)
                 .withProtocol("tcp")
-                .withHost("localhost")
+                // TODO https://github.com/apache/camel-kafka-connector/issues/924
+                .withHost("//localhost")
                 .withPort(PORT)
                 // one-way as test client doesn't receive response
                 .withSync(false);


[camel-kafka-connector] 05/07: Remove duplicate slashes (//) to comply with recent netty URI changes on Camel (ref: CAMEL-16280)

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 3d96ebc214db50cb8d78d078b6309605b82a9309
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Mar 3 14:17:46 2021 +0100

    Remove duplicate slashes (//) to comply with recent netty URI changes on Camel (ref: CAMEL-16280)
---
 .../camel/kafkaconnector/syslog/sink/CamelSyslogPropertyFactory.java    | 2 +-
 .../camel/kafkaconnector/syslog/source/CamelSyslogPropertyFactory.java  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSyslogPropertyFactory.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSyslogPropertyFactory.java
index 4d3ce24..5e41104 100644
--- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSyslogPropertyFactory.java
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSyslogPropertyFactory.java
@@ -29,7 +29,7 @@ final class CamelSyslogPropertyFactory extends SinkConnectorPropertyFactory<Came
     }
 
     public CamelSyslogPropertyFactory withHost(String host) {
-        return setProperty("camel.sink.path.host", "//" + host);
+        return setProperty("camel.sink.path.host", host);
     }
 
     public CamelSyslogPropertyFactory withPort(int port) {
diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSyslogPropertyFactory.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSyslogPropertyFactory.java
index 5e4cd2a..bf438fd 100644
--- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSyslogPropertyFactory.java
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSyslogPropertyFactory.java
@@ -29,7 +29,7 @@ final class CamelSyslogPropertyFactory extends SourceConnectorPropertyFactory<Ca
     }
 
     public CamelSyslogPropertyFactory withHost(String host) {
-        return setProperty("camel.source.path.host", "//" + host);
+        return setProperty("camel.source.path.host", host);
     }
 
     public CamelSyslogPropertyFactory withPort(int port) {


[camel-kafka-connector] 01/07: Add Netty source itest #1036

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit b253e13a83ee65b9011ce1fb4dc5ec20cc577eca
Author: Tadayoshi Sato <sa...@gmail.com>
AuthorDate: Thu Feb 25 23:17:26 2021 +0900

    Add Netty source itest #1036
---
 .../common/test/CamelSourceTestSupport.java        |  15 +++
 tests/itests-netty/pom.xml                         |  45 +++++++++
 .../netty/source/CamelNettyPropertyFactory.java    |  57 +++++++++++
 .../netty/source/CamelSourceNettyITCase.java       | 110 +++++++++++++++++++++
 tests/pom.xml                                      |   1 +
 5 files changed, 228 insertions(+)

diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
index 5bb6a93..016525a 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
@@ -88,6 +88,21 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest {
      * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results
      *
      * @param connectorPropertyFactory A factory for connector properties
+     * @param topic the topic to send the messages to
+     * @param count the number of messages to send
+     * @throws Exception For test-specific exceptions
+     */
+    public void runTestBlocking(ConnectorPropertyFactory connectorPropertyFactory, String topic, int count) throws ExecutionException, InterruptedException {
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+        StringMessageConsumer consumer = new StringMessageConsumer(kafkaClient, topic, count);
+
+        runTestBlocking(connectorPropertyFactory, consumer);
+    }
+
+    /**
+     * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results
+     *
+     * @param connectorPropertyFactory A factory for connector properties
      * @param consumer A Kafka consumer consumer for the test messages
      * @throws Exception For test-specific exceptions
      */
diff --git a/tests/itests-netty/pom.xml b/tests/itests-netty/pom.xml
new file mode 100644
index 0000000..1ef8735
--- /dev/null
+++ b/tests/itests-netty/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.camel.kafkaconnector</groupId>
+        <artifactId>itests-parent</artifactId>
+        <version>0.9.0-SNAPSHOT</version>
+        <relativePath>../itests-parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>itests-netty</artifactId>
+    <name>Camel-Kafka-Connector :: Tests :: Netty</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.kafkaconnector</groupId>
+            <artifactId>itests-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-netty</artifactId>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelNettyPropertyFactory.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelNettyPropertyFactory.java
new file mode 100644
index 0000000..d0f6132
--- /dev/null
+++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelNettyPropertyFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.netty.source;
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
+
+final class CamelNettyPropertyFactory extends SourceConnectorPropertyFactory<CamelNettyPropertyFactory> {
+
+    private CamelNettyPropertyFactory() {
+    }
+
+    public CamelNettyPropertyFactory withProtocol(String value) {
+        return setProperty("camel.source.path.protocol", value);
+    }
+
+    public CamelNettyPropertyFactory withHost(String value) {
+        return setProperty("camel.source.path.host", value);
+    }
+
+    public CamelNettyPropertyFactory withPort(int value) {
+        return setProperty("camel.source.path.port", value);
+    }
+
+    public CamelNettyPropertyFactory withSync(boolean value) {
+        return setProperty("camel.source.endpoint.sync", value);
+    }
+
+    public EndpointUrlBuilder<CamelNettyPropertyFactory> withUrl(String protocol, String host, int port) {
+        String url = String.format("netty:%s://%s:%s", protocol, host, port);
+        return new EndpointUrlBuilder<>(this::withSourceUrl, url);
+    }
+
+    public static CamelNettyPropertyFactory basic() {
+        return new CamelNettyPropertyFactory()
+                .withName("CamelNettySourceConnector")
+                .withTasksMax(1)
+                .withConnectorClass("org.apache.camel.kafkaconnector.netty.CamelNettySourceConnector")
+                .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+    }
+}
diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
new file mode 100644
index 0000000..6c76789
--- /dev/null
+++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.netty.source;
+
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
+import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class CamelSourceNettyITCase extends CamelSourceTestSupport {
+    private static final int PORT = NetworkUtils.getFreePort("localhost");
+
+    private final int expect = 1;
+    private String topicName;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-netty-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        topicName = getTopicForTest(this);
+    }
+
+    @Override
+    protected void produceTestData() {
+        try {
+            // TODO necessary to wait for ckc netty endpoint to be up and ready
+            Thread.sleep(3000);
+        } catch (Exception ignored) {
+        }
+        sendMessage();
+    }
+
+    void sendMessage() {
+        try (Socket s = new Socket("localhost", PORT);
+             PrintWriter out = new PrintWriter(s.getOutputStream())) {
+            out.print("Hello CKC!");
+            out.flush();
+        } catch (Exception e) {
+            fail(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
+        Object receivedObject = consumer.consumedMessages().get(0).value();
+        assertEquals(expect, received, "Did not receive as many messages as expected");
+        assertEquals("Hello CKC!", receivedObject, "Received message content differed");
+    }
+
+    @Test
+    @Timeout(30)
+    @Disabled("Camel-Netty-* connectors are not working #924")
+    public void testLaunchConnector() throws ExecutionException, InterruptedException {
+        CamelNettyPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory
+                .basic()
+                .withKafkaTopic(topicName)
+                .withProtocol("tcp")
+                .withHost("localhost")
+                .withPort(PORT)
+                // one-way as test client doesn't receive response
+                .withSync(false);
+
+        runTestBlocking(connectorPropertyFactory, topicName, expect);
+    }
+
+    @Test
+    @Timeout(30)
+    public void testLaunchConnectorUsingUrl() throws ExecutionException, InterruptedException {
+        CamelNettyPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory
+                .basic()
+                .withKafkaTopic(topicName)
+                .withUrl("tcp", "localhost", PORT)
+                // one-way as test client doesn't receive response
+                .append("sync", "false")
+                .buildUrl();
+
+        runTestBlocking(connectorPropertyFactory, topicName, expect);
+    }
+}
diff --git a/tests/pom.xml b/tests/pom.xml
index 26aa43f..b1ec328 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -62,6 +62,7 @@
         <module>itests-ssh</module>
         <module>itests-sql</module>
         <module>itests-cxf</module>
+        <module>itests-netty</module>
     </modules>
 
 


[camel-kafka-connector] 06/07: Replace the sleep w/ the reusable test utilities for waiting for the port to be open

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit f8df2de57c1b39a704c4f4ece8705dc028ab6266
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Mar 3 15:25:33 2021 +0100

    Replace the sleep w/ the reusable test utilities for waiting for the port to be open
---
 .../kafkaconnector/netty/source/CamelSourceNettyITCase.java  | 12 +++++-------
 1 file changed, 5 insertions(+), 7 deletions(-)

diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
index b2ef5ee..2d76e27 100644
--- a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
+++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ExecutionException;
 import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
 import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -49,11 +50,8 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport {
 
     @Override
     protected void produceTestData() {
-        try {
-            // TODO necessary to wait for ckc netty endpoint to be up and ready
-            Thread.sleep(3000);
-        } catch (Exception ignored) {
-        }
+        TestUtils.waitFor(() -> NetworkUtils.portIsOpen(NetworkUtils.getHostname(), port));
+
         sendMessage();
     }
 
@@ -76,7 +74,7 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport {
     }
 
     @Test
-    @Timeout(30)
+    @Timeout(35)
     public void testLaunchConnector() throws ExecutionException, InterruptedException {
         CamelNettyPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory
                 .basic()
@@ -92,7 +90,7 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport {
     }
 
     @Test
-    @Timeout(30)
+    @Timeout(35)
     public void testLaunchConnectorUsingUrl() throws ExecutionException, InterruptedException {
         CamelNettyPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory
                 .basic()


[camel-kafka-connector] 04/07: Fix Netty tests reliability

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 758bf1eb548e70e6469bcbe10ec2090bdb7a7b0c
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Mar 3 10:06:49 2021 +0100

    Fix Netty tests reliability
    
    - avoid per class lifecycle since it seems to cause the code to try to bind the same port multiple times
    - do not assume local hostnames and/or that they are correctly configured
---
 .../kafkaconnector/netty/sink/CamelSinkNettyITCase.java      | 12 +++++-------
 .../kafkaconnector/netty/source/CamelSourceNettyITCase.java  | 12 +++++-------
 2 files changed, 10 insertions(+), 14 deletions(-)

diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelSinkNettyITCase.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelSinkNettyITCase.java
index bb08243..20b6814 100644
--- a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelSinkNettyITCase.java
+++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelSinkNettyITCase.java
@@ -31,7 +31,6 @@ import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,10 +38,9 @@ import org.slf4j.LoggerFactory;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
-@TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CamelSinkNettyITCase extends CamelSinkTestSupport {
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkNettyITCase.class);
-    private static final int PORT = NetworkUtils.getFreePort("localhost");
+    private final int port = NetworkUtils.getFreePort();
 
     private String topicName;
 
@@ -62,7 +60,7 @@ public class CamelSinkNettyITCase extends CamelSinkTestSupport {
 
     @Override
     protected void consumeMessages(CountDownLatch latch) {
-        try (ServerSocket serverSocket = new ServerSocket(PORT);
+        try (ServerSocket serverSocket = new ServerSocket(port);
              Socket socket = serverSocket.accept();
              InputStream is = socket.getInputStream();
              BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {
@@ -92,8 +90,8 @@ public class CamelSinkNettyITCase extends CamelSinkTestSupport {
                 .withTopics(topicName)
                 .withProtocol("tcp")
                 // TODO https://github.com/apache/camel-kafka-connector/issues/924
-                .withHost("//localhost")
-                .withPort(PORT)
+                .withHost("//" + NetworkUtils.getHostname())
+                .withPort(port)
                 // disconnect so that it won't keep mock server socket forever
                 .withDisconnect(true)
                 // one-way as mock server doesn't send replies
@@ -107,7 +105,7 @@ public class CamelSinkNettyITCase extends CamelSinkTestSupport {
     public void testBasicSendReceiveUsingUrl() throws Exception {
         ConnectorPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory.basic()
                 .withTopics(topicName)
-                .withUrl("tcp", "localhost", PORT)
+                .withUrl("tcp", NetworkUtils.getHostname(), port)
                 // disconnect so that it won't keep mock server socket forever
                 .append("disconnect", "true")
                 // one-way as mock server doesn't send replies
diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
index 5384e22..b2ef5ee 100644
--- a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
+++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
@@ -26,15 +26,13 @@ import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
-@TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CamelSourceNettyITCase extends CamelSourceTestSupport {
-    private static final int PORT = NetworkUtils.getFreePort("localhost");
+    private final int port = NetworkUtils.getFreePort();
 
     private final int expect = 1;
     private String topicName;
@@ -60,7 +58,7 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport {
     }
 
     void sendMessage() {
-        try (Socket s = new Socket("localhost", PORT);
+        try (Socket s = new Socket(NetworkUtils.getHostname(), port);
              PrintWriter out = new PrintWriter(s.getOutputStream())) {
             out.print("Hello CKC!");
             out.flush();
@@ -85,8 +83,8 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport {
                 .withKafkaTopic(topicName)
                 .withProtocol("tcp")
                 // TODO https://github.com/apache/camel-kafka-connector/issues/924
-                .withHost("//localhost")
-                .withPort(PORT)
+                .withHost("//" + NetworkUtils.getHostname())
+                .withPort(port)
                 // one-way as test client doesn't receive response
                 .withSync(false);
 
@@ -99,7 +97,7 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport {
         CamelNettyPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory
                 .basic()
                 .withKafkaTopic(topicName)
-                .withUrl("tcp", "localhost", PORT)
+                .withUrl("tcp", NetworkUtils.getHostname(), port)
                 // one-way as test client doesn't receive response
                 .append("sync", "false")
                 .buildUrl();


[camel-kafka-connector] 03/07: Remove duplicate run test method

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 0ee8710f8c9026531154c74b83742c404c24421d
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Mar 3 10:12:15 2021 +0100

    Remove duplicate run test method
---
 .../common/test/CamelSourceTestSupport.java              | 16 ----------------
 1 file changed, 16 deletions(-)

diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
index 016525a..439799f 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
@@ -134,20 +134,4 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest {
         verifyMessages(consumer);
         LOG.debug("Verified messages");
     }
-
-    /**
-     * A simple test runner that follows the steps: initialize, start producer, consume messages, verify results
-     *
-     * @param connectorPropertyFactory A factory for connector properties
-     * @param topic the topic to send the messages to
-     * @param count the number of messages to send
-     * @throws Exception For test-specific exceptions
-     */
-    public void runTestBlocking(ConnectorPropertyFactory connectorPropertyFactory, String topic, int count) throws ExecutionException, InterruptedException {
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-        StringMessageConsumer consumer = new StringMessageConsumer(kafkaClient, topic, count);
-
-        runTestBlocking(connectorPropertyFactory, consumer);
-    }
-
 }