You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ts...@apache.org on 2021/03/02 05:36:26 UTC

[camel-kafka-connector] branch master updated: Add netty sink itest #1036

This is an automated email from the ASF dual-hosted git repository.

tsato pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new a68e595  Add netty sink itest #1036
a68e595 is described below

commit a68e5951e03add15a63ee3fcd7dd83355e6a7a3e
Author: Tadayoshi Sato <sa...@gmail.com>
AuthorDate: Tue Mar 2 11:53:49 2021 +0900

    Add netty sink itest #1036
---
 .../netty/sink/CamelNettyPropertyFactory.java      |  61 +++++++++++
 .../netty/sink/CamelSinkNettyITCase.java           | 119 +++++++++++++++++++++
 .../netty/source/CamelSourceNettyITCase.java       |   5 +-
 3 files changed, 182 insertions(+), 3 deletions(-)

diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelNettyPropertyFactory.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelNettyPropertyFactory.java
new file mode 100644
index 0000000..31b5343
--- /dev/null
+++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelNettyPropertyFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.netty.sink;
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+
+final class CamelNettyPropertyFactory extends SinkConnectorPropertyFactory<CamelNettyPropertyFactory> {
+
+    private CamelNettyPropertyFactory() {
+    }
+
+    public CamelNettyPropertyFactory withProtocol(String value) {
+        return setProperty("camel.sink.path.protocol", value);
+    }
+
+    public CamelNettyPropertyFactory withHost(String value) {
+        return setProperty("camel.sink.path.host", value);
+    }
+
+    public CamelNettyPropertyFactory withPort(int value) {
+        return setProperty("camel.sink.path.port", value);
+    }
+
+    public CamelNettyPropertyFactory withDisconnect(boolean value) {
+        return setProperty("camel.sink.endpoint.disconnect", value);
+    }
+
+    public CamelNettyPropertyFactory withSync(boolean value) {
+        return setProperty("camel.sink.endpoint.sync", value);
+    }
+
+    public EndpointUrlBuilder<CamelNettyPropertyFactory> withUrl(String protocol, String host, int port) {
+        String url = String.format("netty:%s://%s:%s", protocol, host, port);
+        return new EndpointUrlBuilder<>(this::withSinkUrl, url);
+    }
+
+    public static CamelNettyPropertyFactory basic() {
+        return new CamelNettyPropertyFactory()
+                .withName("CamelNettySinkConnector")
+                .withTasksMax(1)
+                .withConnectorClass("org.apache.camel.kafkaconnector.netty.CamelNettySinkConnector")
+                .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+    }
+}
diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelSinkNettyITCase.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelSinkNettyITCase.java
new file mode 100644
index 0000000..bb08243
--- /dev/null
+++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelSinkNettyITCase.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.netty.sink;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class CamelSinkNettyITCase extends CamelSinkTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkNettyITCase.class);
+    private static final int PORT = NetworkUtils.getFreePort("localhost");
+
+    private String topicName;
+
+    private final int expect = 1;
+    private volatile String received;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-netty-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        topicName = getTopicForTest(this);
+        received = null;
+    }
+
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try (ServerSocket serverSocket = new ServerSocket(PORT);
+             Socket socket = serverSocket.accept();
+             InputStream is = socket.getInputStream();
+             BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {
+            received = reader.readLine();
+            LOG.debug("Received: {}", received);
+        } catch (IOException e) {
+            LOG.error("Unable to receive messages: {}", e.getMessage(), e);
+        } finally {
+            latch.countDown();
+        }
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        String expected = "Sink test message 0";
+        if (latch.await(30, TimeUnit.SECONDS)) {
+            assertEquals(expected, received, "Received message content differed");
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
+    }
+
+    @Test
+    @Timeout(30)
+    public void testBasicSendReceive() throws Exception {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory.basic()
+                .withTopics(topicName)
+                .withProtocol("tcp")
+                // TODO https://github.com/apache/camel-kafka-connector/issues/924
+                .withHost("//localhost")
+                .withPort(PORT)
+                // disconnect so that it won't keep mock server socket forever
+                .withDisconnect(true)
+                // one-way as mock server doesn't send replies
+                .withSync(false);
+
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
+
+    @Test
+    @Timeout(30)
+    public void testBasicSendReceiveUsingUrl() throws Exception {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory.basic()
+                .withTopics(topicName)
+                .withUrl("tcp", "localhost", PORT)
+                // disconnect so that it won't keep mock server socket forever
+                .append("disconnect", "true")
+                // one-way as mock server doesn't send replies
+                .append("sync", "false")
+                .buildUrl();
+
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
+}
diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
index 6c76789..5384e22 100644
--- a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
+++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java
@@ -25,7 +25,6 @@ import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
 import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
@@ -80,13 +79,13 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport {
 
     @Test
     @Timeout(30)
-    @Disabled("Camel-Netty-* connectors are not working #924")
     public void testLaunchConnector() throws ExecutionException, InterruptedException {
         CamelNettyPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory
                 .basic()
                 .withKafkaTopic(topicName)
                 .withProtocol("tcp")
-                .withHost("localhost")
+                // TODO https://github.com/apache/camel-kafka-connector/issues/924
+                .withHost("//localhost")
                 .withPort(PORT)
                 // one-way as test client doesn't receive response
                 .withSync(false);