You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ts...@apache.org on 2021/03/04 12:43:25 UTC

[camel-kafka-connector] branch master updated: Add netty-http itests #1036

This is an automated email from the ASF dual-hosted git repository.

tsato pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new c2d6a93  Add netty-http itests #1036
c2d6a93 is described below

commit c2d6a9323dc11bbd40215e09c7775a69ca8f472c
Author: Tadayoshi Sato <sa...@gmail.com>
AuthorDate: Thu Mar 4 19:50:25 2021 +0900

    Add netty-http itests #1036
    
    netty-http source itests are disabled due to #969
---
 .../common/SinkConnectorPropertyFactory.java       |  12 +--
 .../common/SourceConnectorPropertyFactory.java     |  29 ++++--
 tests/itests-netty-http/pom.xml                    |  56 +++++++++++
 .../sink/CamelNettyhttpPropertyFactory.java        |  65 ++++++++++++
 .../nettyhttp/sink/CamelSinkNettyhttpITCase.java   | 111 +++++++++++++++++++++
 .../source/CamelNettyhttpPropertyFactory.java      |  63 ++++++++++++
 .../source/CamelSourceNettyhttpITCase.java}        |  62 ++++++------
 .../netty/source/CamelSourceNettyITCase.java       |  12 +--
 tests/itests-parent/pom.xml                        |  19 +++-
 tests/pom.xml                                      |   1 +
 10 files changed, 374 insertions(+), 56 deletions(-)

diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SinkConnectorPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SinkConnectorPropertyFactory.java
index 0684164..356ee0d 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SinkConnectorPropertyFactory.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SinkConnectorPropertyFactory.java
@@ -17,17 +17,15 @@
 
 package org.apache.camel.kafkaconnector.common;
 
-public abstract class SinkConnectorPropertyFactory<T extends SinkConnectorPropertyFactory<T>>  extends BasicConnectorPropertyFactory<T> {
+import static org.apache.camel.kafkaconnector.CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF;
 
-    public T withTopics(String topics) {
-        getProperties().put("topics", topics);
+public abstract class SinkConnectorPropertyFactory<T extends SinkConnectorPropertyFactory<T>> extends BasicConnectorPropertyFactory<T> {
 
-        return (T) this;
+    public T withTopics(String topics) {
+        return setProperty("topics", topics);
     }
 
     public T withSinkUrl(String sinkUrl) {
-        getProperties().put("camel.sink.url", sinkUrl);
-
-        return (T) this;
+        return setProperty(CAMEL_SINK_URL_CONF, sinkUrl);
     }
 }
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SourceConnectorPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SourceConnectorPropertyFactory.java
index 684459c..aa59552 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SourceConnectorPropertyFactory.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SourceConnectorPropertyFactory.java
@@ -17,25 +17,32 @@
 
 package org.apache.camel.kafkaconnector.common;
 
-public abstract class SourceConnectorPropertyFactory<T extends SourceConnectorPropertyFactory<T>>  extends BasicConnectorPropertyFactory<T> {
+import org.apache.camel.LoggingLevel;
 
-    public T withKafkaTopic(String topic) {
-        getProperties().put("topics", topic);
+import static org.apache.camel.kafkaconnector.CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME;
+import static org.apache.camel.kafkaconnector.CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF;
+import static org.apache.camel.kafkaconnector.CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF;
+import static org.apache.camel.kafkaconnector.CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF;
+import static org.apache.camel.kafkaconnector.CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF;
+import static org.apache.camel.kafkaconnector.CamelSourceConnectorConfig.TOPIC_CONF;
+
+public abstract class SourceConnectorPropertyFactory<T extends SourceConnectorPropertyFactory<T>> extends BasicConnectorPropertyFactory<T> {
 
-        return (T) this;
+    public T withKafkaTopic(String topic) {
+        return setProperty(TOPIC_CONF, topic);
     }
 
     public T withSourceUrl(String sourceUrl) {
-        getProperties().put("camel.source.url", sourceUrl);
+        return setProperty(CAMEL_SOURCE_URL_CONF, sourceUrl);
+    }
 
-        return (T) this;
+    public T withSourceContentLogginglevel(LoggingLevel level) {
+        return setProperty(CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF, level.toString());
     }
 
     public T withAggregate(String aggregate, int size, int timeout) {
-        withBeans("aggregate", classRef(aggregate));
-        getProperties().put("camel.aggregation.size", size);
-        getProperties().put("camel.aggregation.timeout", timeout);
-
-        return (T) this;
+        return withBeans(CAMEL_CONNECTOR_AGGREGATE_NAME, classRef(aggregate))
+                .setProperty(CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, size)
+                .setProperty(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, timeout);
     }
 }
diff --git a/tests/itests-netty-http/pom.xml b/tests/itests-netty-http/pom.xml
new file mode 100644
index 0000000..16238cf
--- /dev/null
+++ b/tests/itests-netty-http/pom.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.camel.kafkaconnector</groupId>
+        <artifactId>itests-parent</artifactId>
+        <version>0.9.0-SNAPSHOT</version>
+        <relativePath>../itests-parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>itests-netty-http</artifactId>
+    <name>Camel-Kafka-Connector :: Tests :: Netty HTTP</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.kafkaconnector</groupId>
+            <artifactId>itests-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-netty-http</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>okhttp</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelNettyhttpPropertyFactory.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelNettyhttpPropertyFactory.java
new file mode 100644
index 0000000..9754e7d
--- /dev/null
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelNettyhttpPropertyFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.nettyhttp.sink;
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+
+final class CamelNettyhttpPropertyFactory extends SinkConnectorPropertyFactory<CamelNettyhttpPropertyFactory> {
+
+    private CamelNettyhttpPropertyFactory() {
+    }
+
+    public CamelNettyhttpPropertyFactory withProtocol(String value) {
+        return setProperty("camel.sink.path.protocol", value);
+    }
+
+    public CamelNettyhttpPropertyFactory withHost(String value) {
+        return setProperty("camel.sink.path.host", value);
+    }
+
+    public CamelNettyhttpPropertyFactory withPort(int value) {
+        return setProperty("camel.sink.path.port", value);
+    }
+
+    public CamelNettyhttpPropertyFactory withPath(String value) {
+        return setProperty("camel.sink.path.path", value);
+    }
+
+    public CamelNettyhttpPropertyFactory withDisconnect(boolean value) {
+        return setProperty("camel.sink.endpoint.disconnect", value);
+    }
+
+    public CamelNettyhttpPropertyFactory withSync(boolean value) {
+        return setProperty("camel.sink.endpoint.sync", value);
+    }
+
+    public EndpointUrlBuilder<CamelNettyhttpPropertyFactory> withUrl(String protocol, String host, int port, String path) {
+        String url = String.format("netty-http:%s://%s:%s/%s", protocol, host, port, path);
+        return new EndpointUrlBuilder<>(this::withSinkUrl, url);
+    }
+
+    public static CamelNettyhttpPropertyFactory basic() {
+        return new CamelNettyhttpPropertyFactory()
+                .withName("CamelNettyhttpSinkConnector")
+                .withTasksMax(1)
+                .withConnectorClass("org.apache.camel.kafkaconnector.nettyhttp.CamelNettyhttpSinkConnector")
+                .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+    }
+}
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java
new file mode 100644
index 0000000..cdf8b2c
--- /dev/null
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.nettyhttp.sink;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkNettyhttpITCase.class);
+
+    private MockWebServer mockServer;
+
+    private String topicName;
+
+    private final int expect = 1;
+    private volatile RecordedRequest received;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-netty-http-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        topicName = getTopicForTest(this);
+        mockServer = new MockWebServer();
+        received = null;
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        if (mockServer != null) {
+            mockServer.shutdown();
+        }
+    }
+
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try {
+            received = mockServer.takeRequest();
+        } catch (InterruptedException e) {
+            LOG.error("Unable to receive messages: {}", e.getMessage(), e);
+        } finally {
+            latch.countDown();
+        }
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        String expected = "Sink test message 0";
+        if (latch.await(30, TimeUnit.SECONDS)) {
+            assertEquals("/test", received.getPath(), "Received path differed");
+            assertEquals(expected, received.getBody().readUtf8(), "Received message content differed");
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
+    }
+
+    @Test
+    @Timeout(30)
+    public void testBasicSendReceive() throws Exception {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelNettyhttpPropertyFactory.basic()
+                .withTopics(topicName)
+                .withProtocol("http")
+                .withHost(mockServer.getHostName())
+                .withPort(mockServer.getPort())
+                .withPath("test");
+
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
+
+    @Test
+    @Timeout(30)
+    public void testBasicSendReceiveUsingUrl() throws Exception {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelNettyhttpPropertyFactory.basic()
+                .withTopics(topicName)
+                .withUrl("http", mockServer.getHostName(), mockServer.getPort(), "test")
+                .buildUrl();
+
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
+}
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyhttpPropertyFactory.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyhttpPropertyFactory.java
new file mode 100644
index 0000000..d97340f
--- /dev/null
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyhttpPropertyFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.nettyhttp.source;
+
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
+
+final class CamelNettyhttpPropertyFactory extends SourceConnectorPropertyFactory<CamelNettyhttpPropertyFactory> {
+
+    private CamelNettyhttpPropertyFactory() {
+    }
+
+    public CamelNettyhttpPropertyFactory withProtocol(String value) {
+        return setProperty("camel.source.path.protocol", value);
+    }
+
+    public CamelNettyhttpPropertyFactory withHost(String value) {
+        return setProperty("camel.source.path.host", value);
+    }
+
+    public CamelNettyhttpPropertyFactory withPort(int value) {
+        return setProperty("camel.source.path.port", value);
+    }
+
+    public CamelNettyhttpPropertyFactory withPath(String value) {
+        return setProperty("camel.source.path.path", value);
+    }
+
+    public EndpointUrlBuilder<CamelNettyhttpPropertyFactory> withUrl(String protocol, String host, int port, String path) {
+        String url = String.format("netty-http:%s://%s:%s/%s", protocol, host, port, path);
+        return new EndpointUrlBuilder<>(this::withSourceUrl, url);
+    }
+
+    public static CamelNettyhttpPropertyFactory basic() {
+        return new CamelNettyhttpPropertyFactory()
+                .withName("CamelNettyhttpSourceConnector")
+                .withTasksMax(1)
+                .withConnectorClass("org.apache.camel.kafkaconnector.nettyhttp.CamelNettyhttpSourceConnector")
+                .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                .withTransformsConfig("tostring")
+                .withEntry("type", "org.apache.camel.kafkaconnector.transforms.CamelTypeConverterTransform$Value")
+                .withEntry("target.type", "java.lang.String")
+                .end()
+                .withSourceContentLogginglevel(LoggingLevel.DEBUG);
+    }
+}
diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyhttpITCase.java
similarity index 58%
copy from tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
copy to tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyhttpITCase.java
index b2ef5ee..e1c28de 100644
--- a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyhttpITCase.java
@@ -15,23 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.camel.kafkaconnector.netty.source;
+package org.apache.camel.kafkaconnector.nettyhttp.source;
 
-import java.io.PrintWriter;
-import java.net.Socket;
+import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
 import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
 import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
-public class CamelSourceNettyITCase extends CamelSourceTestSupport {
+@Disabled("Convert NettyChannelBufferStreamCache from NettyHttpSource not converted to string #969")
+public class CamelSourceNettyhttpITCase extends CamelSourceTestSupport {
+    private final String host = NetworkUtils.getHostname();
     private final int port = NetworkUtils.getFreePort();
 
     private final int expect = 1;
@@ -39,7 +47,7 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport {
 
     @Override
     protected String[] getConnectorsInTest() {
-        return new String[] {"camel-netty-kafka-connector"};
+        return new String[] {"camel-netty-http-kafka-connector"};
     }
 
     @BeforeEach
@@ -49,20 +57,20 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport {
 
     @Override
     protected void produceTestData() {
-        try {
-            // TODO necessary to wait for ckc netty endpoint to be up and ready
-            Thread.sleep(3000);
-        } catch (Exception ignored) {
-        }
+        TestUtils.waitFor(() -> NetworkUtils.portIsOpen(host, port));
         sendMessage();
     }
 
     void sendMessage() {
-        try (Socket s = new Socket(NetworkUtils.getHostname(), port);
-             PrintWriter out = new PrintWriter(s.getOutputStream())) {
-            out.print("Hello CKC!");
-            out.flush();
-        } catch (Exception e) {
+        OkHttpClient client = new OkHttpClient();
+        RequestBody body = RequestBody.create(MediaType.get("text/plain; charset=utf-8"), "Hello CKC!");
+        Request request = new Request.Builder()
+                .url("http://" + host + ":" + port + "/test")
+                .post(body)
+                .build();
+        try (Response response = client.newCall(request).execute()) {
+            assertEquals(200, response.code(), "Source endpoint didn't return 200");
+        } catch (IOException e) {
             fail(e.getMessage(), e);
         }
     }
@@ -70,7 +78,7 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport {
     @Override
     protected void verifyMessages(TestMessageConsumer<?> consumer) {
         int received = consumer.consumedMessages().size();
-        Object receivedObject = consumer.consumedMessages().get(0).value();
+        String receivedObject = (String) consumer.consumedMessages().get(0).value();
         assertEquals(expect, received, "Did not receive as many messages as expected");
         assertEquals("Hello CKC!", receivedObject, "Received message content differed");
     }
@@ -78,30 +86,24 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport {
     @Test
     @Timeout(30)
     public void testLaunchConnector() throws ExecutionException, InterruptedException {
-        CamelNettyPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory
-                .basic()
+        CamelNettyhttpPropertyFactory connectorPropertyFactory = CamelNettyhttpPropertyFactory.basic()
                 .withKafkaTopic(topicName)
-                .withProtocol("tcp")
-                // TODO https://github.com/apache/camel-kafka-connector/issues/924
-                .withHost("//" + NetworkUtils.getHostname())
+                .withProtocol("http")
+                .withHost(host)
                 .withPort(port)
-                // one-way as test client doesn't receive response
-                .withSync(false);
+                .withPath("test");
 
-        runTestBlocking(connectorPropertyFactory, topicName, expect);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     @Test
     @Timeout(30)
     public void testLaunchConnectorUsingUrl() throws ExecutionException, InterruptedException {
-        CamelNettyPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory
-                .basic()
+        CamelNettyhttpPropertyFactory connectorPropertyFactory = CamelNettyhttpPropertyFactory.basic()
                 .withKafkaTopic(topicName)
-                .withUrl("tcp", NetworkUtils.getHostname(), port)
-                // one-way as test client doesn't receive response
-                .append("sync", "false")
+                .withUrl("http", host, port, "test")
                 .buildUrl();
 
-        runTestBlocking(connectorPropertyFactory, topicName, expect);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 }
diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
index b2ef5ee..481b15c 100644
--- a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
+++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ExecutionException;
 import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
 import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -32,6 +33,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
 public class CamelSourceNettyITCase extends CamelSourceTestSupport {
+    private final String host = NetworkUtils.getHostname();
     private final int port = NetworkUtils.getFreePort();
 
     private final int expect = 1;
@@ -49,11 +51,7 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport {
 
     @Override
     protected void produceTestData() {
-        try {
-            // TODO necessary to wait for ckc netty endpoint to be up and ready
-            Thread.sleep(3000);
-        } catch (Exception ignored) {
-        }
+        TestUtils.waitFor(() -> NetworkUtils.portIsOpen(host, port));
         sendMessage();
     }
 
@@ -83,7 +81,7 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport {
                 .withKafkaTopic(topicName)
                 .withProtocol("tcp")
                 // TODO https://github.com/apache/camel-kafka-connector/issues/924
-                .withHost("//" + NetworkUtils.getHostname())
+                .withHost("//" + host)
                 .withPort(port)
                 // one-way as test client doesn't receive response
                 .withSync(false);
@@ -97,7 +95,7 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport {
         CamelNettyPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory
                 .basic()
                 .withKafkaTopic(topicName)
-                .withUrl("tcp", NetworkUtils.getHostname(), port)
+                .withUrl("tcp", host, port)
                 // one-way as test client doesn't receive response
                 .append("sync", "false")
                 .buildUrl();
diff --git a/tests/itests-parent/pom.xml b/tests/itests-parent/pom.xml
index 3c37eec..a109494 100644
--- a/tests/itests-parent/pom.xml
+++ b/tests/itests-parent/pom.xml
@@ -188,6 +188,23 @@
         </dependency>
     </dependencies>
 
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>com.squareup.okhttp3</groupId>
+                <artifactId>okhttp</artifactId>
+                <version>${squareup-okhttp-version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>com.squareup.okhttp3</groupId>
+                <artifactId>mockwebserver</artifactId>
+                <version>${squareup-okhttp-version}</version>
+                <scope>test</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
     <build>
         <plugins>
             <plugin>
@@ -220,4 +237,4 @@
     </build>
 
 
-</project>
\ No newline at end of file
+</project>
diff --git a/tests/pom.xml b/tests/pom.xml
index 097d2bd..fadfc35 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -64,6 +64,7 @@
         <module>itests-sql</module>
         <module>itests-cxf</module>
         <module>itests-netty</module>
+        <module>itests-netty-http</module>
     </modules>