You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ts...@apache.org on 2021/03/04 12:43:25 UTC
[camel-kafka-connector] branch master updated: Add netty-http
itests #1036
This is an automated email from the ASF dual-hosted git repository.
tsato pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new c2d6a93 Add netty-http itests #1036
c2d6a93 is described below
commit c2d6a9323dc11bbd40215e09c7775a69ca8f472c
Author: Tadayoshi Sato <sa...@gmail.com>
AuthorDate: Thu Mar 4 19:50:25 2021 +0900
Add netty-http itests #1036
netty-http source itests are disabled due to #969
---
.../common/SinkConnectorPropertyFactory.java | 12 +--
.../common/SourceConnectorPropertyFactory.java | 29 ++++--
tests/itests-netty-http/pom.xml | 56 +++++++++++
.../sink/CamelNettyhttpPropertyFactory.java | 65 ++++++++++++
.../nettyhttp/sink/CamelSinkNettyhttpITCase.java | 111 +++++++++++++++++++++
.../source/CamelNettyhttpPropertyFactory.java | 63 ++++++++++++
.../source/CamelSourceNettyhttpITCase.java} | 62 ++++++------
.../netty/source/CamelSourceNettyITCase.java | 12 +--
tests/itests-parent/pom.xml | 19 +++-
tests/pom.xml | 1 +
10 files changed, 374 insertions(+), 56 deletions(-)
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SinkConnectorPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SinkConnectorPropertyFactory.java
index 0684164..356ee0d 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SinkConnectorPropertyFactory.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SinkConnectorPropertyFactory.java
@@ -17,17 +17,15 @@
package org.apache.camel.kafkaconnector.common;
-public abstract class SinkConnectorPropertyFactory<T extends SinkConnectorPropertyFactory<T>> extends BasicConnectorPropertyFactory<T> {
+import static org.apache.camel.kafkaconnector.CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF;
- public T withTopics(String topics) {
- getProperties().put("topics", topics);
+public abstract class SinkConnectorPropertyFactory<T extends SinkConnectorPropertyFactory<T>> extends BasicConnectorPropertyFactory<T> {
- return (T) this;
+ public T withTopics(String topics) {
+ return setProperty("topics", topics);
}
public T withSinkUrl(String sinkUrl) {
- getProperties().put("camel.sink.url", sinkUrl);
-
- return (T) this;
+ return setProperty(CAMEL_SINK_URL_CONF, sinkUrl);
}
}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SourceConnectorPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SourceConnectorPropertyFactory.java
index 684459c..aa59552 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SourceConnectorPropertyFactory.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SourceConnectorPropertyFactory.java
@@ -17,25 +17,32 @@
package org.apache.camel.kafkaconnector.common;
-public abstract class SourceConnectorPropertyFactory<T extends SourceConnectorPropertyFactory<T>> extends BasicConnectorPropertyFactory<T> {
+import org.apache.camel.LoggingLevel;
- public T withKafkaTopic(String topic) {
- getProperties().put("topics", topic);
+import static org.apache.camel.kafkaconnector.CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME;
+import static org.apache.camel.kafkaconnector.CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF;
+import static org.apache.camel.kafkaconnector.CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF;
+import static org.apache.camel.kafkaconnector.CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF;
+import static org.apache.camel.kafkaconnector.CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF;
+import static org.apache.camel.kafkaconnector.CamelSourceConnectorConfig.TOPIC_CONF;
+
+public abstract class SourceConnectorPropertyFactory<T extends SourceConnectorPropertyFactory<T>> extends BasicConnectorPropertyFactory<T> {
- return (T) this;
+ public T withKafkaTopic(String topic) {
+ return setProperty(TOPIC_CONF, topic);
}
public T withSourceUrl(String sourceUrl) {
- getProperties().put("camel.source.url", sourceUrl);
+ return setProperty(CAMEL_SOURCE_URL_CONF, sourceUrl);
+ }
- return (T) this;
+ public T withSourceContentLogginglevel(LoggingLevel level) {
+ return setProperty(CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF, level.toString());
}
public T withAggregate(String aggregate, int size, int timeout) {
- withBeans("aggregate", classRef(aggregate));
- getProperties().put("camel.aggregation.size", size);
- getProperties().put("camel.aggregation.timeout", timeout);
-
- return (T) this;
+ return withBeans(CAMEL_CONNECTOR_AGGREGATE_NAME, classRef(aggregate))
+ .setProperty(CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, size)
+ .setProperty(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, timeout);
}
}
diff --git a/tests/itests-netty-http/pom.xml b/tests/itests-netty-http/pom.xml
new file mode 100644
index 0000000..16238cf
--- /dev/null
+++ b/tests/itests-netty-http/pom.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.camel.kafkaconnector</groupId>
+ <artifactId>itests-parent</artifactId>
+ <version>0.9.0-SNAPSHOT</version>
+ <relativePath>../itests-parent/pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>itests-netty-http</artifactId>
+ <name>Camel-Kafka-Connector :: Tests :: Netty HTTP</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel.kafkaconnector</groupId>
+ <artifactId>itests-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-netty-http</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>okhttp</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelNettyhttpPropertyFactory.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelNettyhttpPropertyFactory.java
new file mode 100644
index 0000000..9754e7d
--- /dev/null
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelNettyhttpPropertyFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.nettyhttp.sink;
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+
+final class CamelNettyhttpPropertyFactory extends SinkConnectorPropertyFactory<CamelNettyhttpPropertyFactory> {
+
+ private CamelNettyhttpPropertyFactory() {
+ }
+
+ public CamelNettyhttpPropertyFactory withProtocol(String value) {
+ return setProperty("camel.sink.path.protocol", value);
+ }
+
+ public CamelNettyhttpPropertyFactory withHost(String value) {
+ return setProperty("camel.sink.path.host", value);
+ }
+
+ public CamelNettyhttpPropertyFactory withPort(int value) {
+ return setProperty("camel.sink.path.port", value);
+ }
+
+ public CamelNettyhttpPropertyFactory withPath(String value) {
+ return setProperty("camel.sink.path.path", value);
+ }
+
+ public CamelNettyhttpPropertyFactory withDisconnect(boolean value) {
+ return setProperty("camel.sink.endpoint.disconnect", value);
+ }
+
+ public CamelNettyhttpPropertyFactory withSync(boolean value) {
+ return setProperty("camel.sink.endpoint.sync", value);
+ }
+
+ public EndpointUrlBuilder<CamelNettyhttpPropertyFactory> withUrl(String protocol, String host, int port, String path) {
+ String url = String.format("netty-http:%s://%s:%s/%s", protocol, host, port, path);
+ return new EndpointUrlBuilder<>(this::withSinkUrl, url);
+ }
+
+ public static CamelNettyhttpPropertyFactory basic() {
+ return new CamelNettyhttpPropertyFactory()
+ .withName("CamelNettyhttpSinkConnector")
+ .withTasksMax(1)
+ .withConnectorClass("org.apache.camel.kafkaconnector.nettyhttp.CamelNettyhttpSinkConnector")
+ .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+ .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+ }
+}
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java
new file mode 100644
index 0000000..cdf8b2c
--- /dev/null
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.nettyhttp.sink;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(CamelSinkNettyhttpITCase.class);
+
+ private MockWebServer mockServer;
+
+ private String topicName;
+
+ private final int expect = 1;
+ private volatile RecordedRequest received;
+
+ @Override
+ protected String[] getConnectorsInTest() {
+ return new String[] {"camel-netty-http-kafka-connector"};
+ }
+
+ @BeforeEach
+ public void setUp() {
+ topicName = getTopicForTest(this);
+ mockServer = new MockWebServer();
+ received = null;
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ if (mockServer != null) {
+ mockServer.shutdown();
+ }
+ }
+
+ @Override
+ protected void consumeMessages(CountDownLatch latch) {
+ try {
+ received = mockServer.takeRequest();
+ } catch (InterruptedException e) {
+ LOG.error("Unable to receive messages: {}", e.getMessage(), e);
+ } finally {
+ latch.countDown();
+ }
+ }
+
+ @Override
+ protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+ String expected = "Sink test message 0";
+ if (latch.await(30, TimeUnit.SECONDS)) {
+ assertEquals("/test", received.getPath(), "Received path differed");
+ assertEquals(expected, received.getBody().readUtf8(), "Received message content differed");
+ } else {
+ fail("Failed to receive the messages within the specified time");
+ }
+ }
+
+ @Test
+ @Timeout(30)
+ public void testBasicSendReceive() throws Exception {
+ ConnectorPropertyFactory connectorPropertyFactory = CamelNettyhttpPropertyFactory.basic()
+ .withTopics(topicName)
+ .withProtocol("http")
+ .withHost(mockServer.getHostName())
+ .withPort(mockServer.getPort())
+ .withPath("test");
+
+ runTest(connectorPropertyFactory, topicName, expect);
+ }
+
+ @Test
+ @Timeout(30)
+ public void testBasicSendReceiveUsingUrl() throws Exception {
+ ConnectorPropertyFactory connectorPropertyFactory = CamelNettyhttpPropertyFactory.basic()
+ .withTopics(topicName)
+ .withUrl("http", mockServer.getHostName(), mockServer.getPort(), "test")
+ .buildUrl();
+
+ runTest(connectorPropertyFactory, topicName, expect);
+ }
+}
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyhttpPropertyFactory.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyhttpPropertyFactory.java
new file mode 100644
index 0000000..d97340f
--- /dev/null
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyhttpPropertyFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.nettyhttp.source;
+
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
+
+final class CamelNettyhttpPropertyFactory extends SourceConnectorPropertyFactory<CamelNettyhttpPropertyFactory> {
+
+ private CamelNettyhttpPropertyFactory() {
+ }
+
+ public CamelNettyhttpPropertyFactory withProtocol(String value) {
+ return setProperty("camel.source.path.protocol", value);
+ }
+
+ public CamelNettyhttpPropertyFactory withHost(String value) {
+ return setProperty("camel.source.path.host", value);
+ }
+
+ public CamelNettyhttpPropertyFactory withPort(int value) {
+ return setProperty("camel.source.path.port", value);
+ }
+
+ public CamelNettyhttpPropertyFactory withPath(String value) {
+ return setProperty("camel.source.path.path", value);
+ }
+
+ public EndpointUrlBuilder<CamelNettyhttpPropertyFactory> withUrl(String protocol, String host, int port, String path) {
+ String url = String.format("netty-http:%s://%s:%s/%s", protocol, host, port, path);
+ return new EndpointUrlBuilder<>(this::withSourceUrl, url);
+ }
+
+ public static CamelNettyhttpPropertyFactory basic() {
+ return new CamelNettyhttpPropertyFactory()
+ .withName("CamelNettyhttpSourceConnector")
+ .withTasksMax(1)
+ .withConnectorClass("org.apache.camel.kafkaconnector.nettyhttp.CamelNettyhttpSourceConnector")
+ .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+ .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter")
+ .withTransformsConfig("tostring")
+ .withEntry("type", "org.apache.camel.kafkaconnector.transforms.CamelTypeConverterTransform$Value")
+ .withEntry("target.type", "java.lang.String")
+ .end()
+ .withSourceContentLogginglevel(LoggingLevel.DEBUG);
+ }
+}
diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyhttpITCase.java
similarity index 58%
copy from tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
copy to tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyhttpITCase.java
index b2ef5ee..e1c28de 100644
--- a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyhttpITCase.java
@@ -15,23 +15,31 @@
* limitations under the License.
*/
-package org.apache.camel.kafkaconnector.netty.source;
+package org.apache.camel.kafkaconnector.nettyhttp.source;
-import java.io.PrintWriter;
-import java.net.Socket;
+import java.io.IOException;
import java.util.concurrent.ExecutionException;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
-public class CamelSourceNettyITCase extends CamelSourceTestSupport {
+@Disabled("Convert NettyChannelBufferStreamCache from NettyHttpSource not converted to string #969")
+public class CamelSourceNettyhttpITCase extends CamelSourceTestSupport {
+ private final String host = NetworkUtils.getHostname();
private final int port = NetworkUtils.getFreePort();
private final int expect = 1;
@@ -39,7 +47,7 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport {
@Override
protected String[] getConnectorsInTest() {
- return new String[] {"camel-netty-kafka-connector"};
+ return new String[] {"camel-netty-http-kafka-connector"};
}
@BeforeEach
@@ -49,20 +57,20 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport {
@Override
protected void produceTestData() {
- try {
- // TODO necessary to wait for ckc netty endpoint to be up and ready
- Thread.sleep(3000);
- } catch (Exception ignored) {
- }
+ TestUtils.waitFor(() -> NetworkUtils.portIsOpen(host, port));
sendMessage();
}
void sendMessage() {
- try (Socket s = new Socket(NetworkUtils.getHostname(), port);
- PrintWriter out = new PrintWriter(s.getOutputStream())) {
- out.print("Hello CKC!");
- out.flush();
- } catch (Exception e) {
+ OkHttpClient client = new OkHttpClient();
+ RequestBody body = RequestBody.create(MediaType.get("text/plain; charset=utf-8"), "Hello CKC!");
+ Request request = new Request.Builder()
+ .url("http://" + host + ":" + port + "/test")
+ .post(body)
+ .build();
+ try (Response response = client.newCall(request).execute()) {
+ assertEquals(200, response.code(), "Source endpoint didn't return 200");
+ } catch (IOException e) {
fail(e.getMessage(), e);
}
}
@@ -70,7 +78,7 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport {
@Override
protected void verifyMessages(TestMessageConsumer<?> consumer) {
int received = consumer.consumedMessages().size();
- Object receivedObject = consumer.consumedMessages().get(0).value();
+ String receivedObject = (String) consumer.consumedMessages().get(0).value();
assertEquals(expect, received, "Did not receive as many messages as expected");
assertEquals("Hello CKC!", receivedObject, "Received message content differed");
}
@@ -78,30 +86,24 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport {
@Test
@Timeout(30)
public void testLaunchConnector() throws ExecutionException, InterruptedException {
- CamelNettyPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory
- .basic()
+ CamelNettyhttpPropertyFactory connectorPropertyFactory = CamelNettyhttpPropertyFactory.basic()
.withKafkaTopic(topicName)
- .withProtocol("tcp")
- // TODO https://github.com/apache/camel-kafka-connector/issues/924
- .withHost("//" + NetworkUtils.getHostname())
+ .withProtocol("http")
+ .withHost(host)
.withPort(port)
- // one-way as test client doesn't receive response
- .withSync(false);
+ .withPath("test");
- runTestBlocking(connectorPropertyFactory, topicName, expect);
+ runTest(connectorPropertyFactory, topicName, expect);
}
@Test
@Timeout(30)
public void testLaunchConnectorUsingUrl() throws ExecutionException, InterruptedException {
- CamelNettyPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory
- .basic()
+ CamelNettyhttpPropertyFactory connectorPropertyFactory = CamelNettyhttpPropertyFactory.basic()
.withKafkaTopic(topicName)
- .withUrl("tcp", NetworkUtils.getHostname(), port)
- // one-way as test client doesn't receive response
- .append("sync", "false")
+ .withUrl("http", host, port, "test")
.buildUrl();
- runTestBlocking(connectorPropertyFactory, topicName, expect);
+ runTest(connectorPropertyFactory, topicName, expect);
}
}
diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
index b2ef5ee..481b15c 100644
--- a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
+++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ExecutionException;
import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -32,6 +33,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
public class CamelSourceNettyITCase extends CamelSourceTestSupport {
+ private final String host = NetworkUtils.getHostname();
private final int port = NetworkUtils.getFreePort();
private final int expect = 1;
@@ -49,11 +51,7 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport {
@Override
protected void produceTestData() {
- try {
- // TODO necessary to wait for ckc netty endpoint to be up and ready
- Thread.sleep(3000);
- } catch (Exception ignored) {
- }
+ TestUtils.waitFor(() -> NetworkUtils.portIsOpen(host, port));
sendMessage();
}
@@ -83,7 +81,7 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport {
.withKafkaTopic(topicName)
.withProtocol("tcp")
// TODO https://github.com/apache/camel-kafka-connector/issues/924
- .withHost("//" + NetworkUtils.getHostname())
+ .withHost("//" + host)
.withPort(port)
// one-way as test client doesn't receive response
.withSync(false);
@@ -97,7 +95,7 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport {
CamelNettyPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory
.basic()
.withKafkaTopic(topicName)
- .withUrl("tcp", NetworkUtils.getHostname(), port)
+ .withUrl("tcp", host, port)
// one-way as test client doesn't receive response
.append("sync", "false")
.buildUrl();
diff --git a/tests/itests-parent/pom.xml b/tests/itests-parent/pom.xml
index 3c37eec..a109494 100644
--- a/tests/itests-parent/pom.xml
+++ b/tests/itests-parent/pom.xml
@@ -188,6 +188,23 @@
</dependency>
</dependencies>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>okhttp</artifactId>
+ <version>${squareup-okhttp-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver</artifactId>
+ <version>${squareup-okhttp-version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
<build>
<plugins>
<plugin>
@@ -220,4 +237,4 @@
</build>
-</project>
\ No newline at end of file
+</project>
diff --git a/tests/pom.xml b/tests/pom.xml
index 097d2bd..fadfc35 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -64,6 +64,7 @@
<module>itests-sql</module>
<module>itests-cxf</module>
<module>itests-netty</module>
+ <module>itests-netty-http</module>
</modules>