You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/03/02 08:14:37 UTC
[camel-kafka-connector] 01/02: Add Netty source itest #1036
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch backport-netty-itests
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 4c215981a84ae8103f059b1b35d1482970d7a874
Author: Tadayoshi Sato <sa...@gmail.com>
AuthorDate: Thu Feb 25 23:17:26 2021 +0900
Add Netty source itest #1036
---
.../common/test/CamelSourceTestSupport.java | 15 +++
tests/itests-netty/pom.xml | 45 +++++++++
.../netty/source/CamelNettyPropertyFactory.java | 57 +++++++++++
.../netty/source/CamelSourceNettyITCase.java | 110 +++++++++++++++++++++
tests/pom.xml | 1 +
5 files changed, 228 insertions(+)
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
index 5bb6a93..016525a 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
@@ -88,6 +88,21 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest {
* A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results
*
* @param connectorPropertyFactory A factory for connector properties
+ * @param topic the topic to send the messages to
+ * @param count the number of messages to send
+ * @throws Exception For test-specific exceptions
+ */
+ public void runTestBlocking(ConnectorPropertyFactory connectorPropertyFactory, String topic, int count) throws ExecutionException, InterruptedException {
+ KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+ StringMessageConsumer consumer = new StringMessageConsumer(kafkaClient, topic, count);
+
+ runTestBlocking(connectorPropertyFactory, consumer);
+ }
+
+ /**
+ * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results
+ *
+ * @param connectorPropertyFactory A factory for connector properties
* @param consumer A Kafka consumer consumer for the test messages
* @throws Exception For test-specific exceptions
*/
diff --git a/tests/itests-netty/pom.xml b/tests/itests-netty/pom.xml
new file mode 100644
index 0000000..1ef8735
--- /dev/null
+++ b/tests/itests-netty/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.camel.kafkaconnector</groupId>
+ <artifactId>itests-parent</artifactId>
+ <version>0.9.0-SNAPSHOT</version>
+ <relativePath>../itests-parent/pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>itests-netty</artifactId>
+ <name>Camel-Kafka-Connector :: Tests :: Netty</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel.kafkaconnector</groupId>
+ <artifactId>itests-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-netty</artifactId>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelNettyPropertyFactory.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelNettyPropertyFactory.java
new file mode 100644
index 0000000..d0f6132
--- /dev/null
+++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelNettyPropertyFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.netty.source;
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
+
+final class CamelNettyPropertyFactory extends SourceConnectorPropertyFactory<CamelNettyPropertyFactory> {
+
+ private CamelNettyPropertyFactory() {
+ }
+
+ public CamelNettyPropertyFactory withProtocol(String value) {
+ return setProperty("camel.source.path.protocol", value);
+ }
+
+ public CamelNettyPropertyFactory withHost(String value) {
+ return setProperty("camel.source.path.host", value);
+ }
+
+ public CamelNettyPropertyFactory withPort(int value) {
+ return setProperty("camel.source.path.port", value);
+ }
+
+ public CamelNettyPropertyFactory withSync(boolean value) {
+ return setProperty("camel.source.endpoint.sync", value);
+ }
+
+ public EndpointUrlBuilder<CamelNettyPropertyFactory> withUrl(String protocol, String host, int port) {
+ String url = String.format("netty:%s://%s:%s", protocol, host, port);
+ return new EndpointUrlBuilder<>(this::withSourceUrl, url);
+ }
+
+ public static CamelNettyPropertyFactory basic() {
+ return new CamelNettyPropertyFactory()
+ .withName("CamelNettySourceConnector")
+ .withTasksMax(1)
+ .withConnectorClass("org.apache.camel.kafkaconnector.netty.CamelNettySourceConnector")
+ .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+ .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+ }
+}
diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
new file mode 100644
index 0000000..6c76789
--- /dev/null
+++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.netty.source;
+
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
+import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class CamelSourceNettyITCase extends CamelSourceTestSupport {
+ private static final int PORT = NetworkUtils.getFreePort("localhost");
+
+ private final int expect = 1;
+ private String topicName;
+
+ @Override
+ protected String[] getConnectorsInTest() {
+ return new String[] {"camel-netty-kafka-connector"};
+ }
+
+ @BeforeEach
+ public void setUp() {
+ topicName = getTopicForTest(this);
+ }
+
+ @Override
+ protected void produceTestData() {
+ try {
+ // TODO necessary to wait for ckc netty endpoint to be up and ready
+ Thread.sleep(3000);
+ } catch (Exception ignored) {
+ }
+ sendMessage();
+ }
+
+ void sendMessage() {
+ try (Socket s = new Socket("localhost", PORT);
+ PrintWriter out = new PrintWriter(s.getOutputStream())) {
+ out.print("Hello CKC!");
+ out.flush();
+ } catch (Exception e) {
+ fail(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ protected void verifyMessages(TestMessageConsumer<?> consumer) {
+ int received = consumer.consumedMessages().size();
+ Object receivedObject = consumer.consumedMessages().get(0).value();
+ assertEquals(expect, received, "Did not receive as many messages as expected");
+ assertEquals("Hello CKC!", receivedObject, "Received message content differed");
+ }
+
+ @Test
+ @Timeout(30)
+ @Disabled("Camel-Netty-* connectors are not working #924")
+ public void testLaunchConnector() throws ExecutionException, InterruptedException {
+ CamelNettyPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory
+ .basic()
+ .withKafkaTopic(topicName)
+ .withProtocol("tcp")
+ .withHost("localhost")
+ .withPort(PORT)
+ // one-way as test client doesn't receive response
+ .withSync(false);
+
+ runTestBlocking(connectorPropertyFactory, topicName, expect);
+ }
+
+ @Test
+ @Timeout(30)
+ public void testLaunchConnectorUsingUrl() throws ExecutionException, InterruptedException {
+ CamelNettyPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory
+ .basic()
+ .withKafkaTopic(topicName)
+ .withUrl("tcp", "localhost", PORT)
+ // one-way as test client doesn't receive response
+ .append("sync", "false")
+ .buildUrl();
+
+ runTestBlocking(connectorPropertyFactory, topicName, expect);
+ }
+}
diff --git a/tests/pom.xml b/tests/pom.xml
index 26aa43f..b1ec328 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -62,6 +62,7 @@
<module>itests-ssh</module>
<module>itests-sql</module>
<module>itests-cxf</module>
+ <module>itests-netty</module>
</modules>