You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2021/03/12 17:58:07 UTC

[camel-kafka-connector] 03/12: fix #969 : Convert NettyChannelBufferStreamCache from NettyHttpSource not converted to string.

This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch camel-kafka-connector-0.7.x
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 9825019ca3b70670c0c6ccd46baab12b71af32c8
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Fri Mar 5 22:17:03 2021 +0100

    fix #969 : Convert NettyChannelBufferStreamCache from NettyHttpSource not converted to string.
---
 tests/itests-netty-http/pom.xml                    |   9 ++
 .../source/CamelNettyHTTPPropertyFactory.java      |  60 ++++++++++
 .../source/CamelSourceNettyHTTPITCase.java         | 123 +++++++++++++++++++++
 tests/pom.xml                                      |   3 +-
 4 files changed, 194 insertions(+), 1 deletion(-)

diff --git a/tests/itests-netty-http/pom.xml b/tests/itests-netty-http/pom.xml
index 16238cf..353c75e 100644
--- a/tests/itests-netty-http/pom.xml
+++ b/tests/itests-netty-http/pom.xml
@@ -37,6 +37,15 @@
             <scope>test</scope>
         </dependency>
 
+        <!-- test infra -->
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test-infra-common</artifactId>
+            <version>${camel.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-netty-http</artifactId>
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyHTTPPropertyFactory.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyHTTPPropertyFactory.java
new file mode 100644
index 0000000..e4df820
--- /dev/null
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyHTTPPropertyFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.nettyhttp.source;
+
+import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
+
+final class CamelNettyHTTPPropertyFactory extends SourceConnectorPropertyFactory<CamelNettyHTTPPropertyFactory> {
+    private CamelNettyHTTPPropertyFactory() {
+
+    }
+
+    public CamelNettyHTTPPropertyFactory withHost(String host) {
+        return setProperty("camel.source.path.host", host);
+    }
+
+    public CamelNettyHTTPPropertyFactory withProtocol(String protocol) {
+        return setProperty("camel.source.path.protocol", protocol);
+    }
+
+    public CamelNettyHTTPPropertyFactory withPort(int port) {
+        return setProperty("camel.source.path.port", String.valueOf(port));
+    }
+
+    public CamelNettyHTTPPropertyFactory withSync(boolean sync) {
+        return setProperty("camel.source.endpoint.sync", String.valueOf(sync));
+    }
+
+    public CamelNettyHTTPPropertyFactory withReceiveBufferSize(int size) {
+        return setProperty("camel.source.endpoint.receiveBufferSize", String.valueOf(size));
+    }
+
+    public CamelNettyHTTPPropertyFactory withCamelTypeConverterTransformTo(String targetClass) {
+        setProperty("transforms", "cameltypeconverter");
+        setProperty("transforms.cameltypeconverter.type", "org.apache.camel.kafkaconnector.transforms.CamelTypeConverterTransform$Value");
+        return setProperty("transforms.cameltypeconverter.target.type", targetClass);
+    }
+
+    public static CamelNettyHTTPPropertyFactory basic() {
+        return new CamelNettyHTTPPropertyFactory()
+                .withTasksMax(1)
+                .withName("CamelNettyHttpSourceConnector")
+                .withConnectorClass("org.apache.camel.kafkaconnector.nettyhttp.CamelNettyhttpSourceConnector")
+                .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+    }
+}
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
new file mode 100644
index 0000000..41cb6e1
--- /dev/null
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.nettyhttp.source;
+
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
+import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceNettyHTTPITCase.class);
+    private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost");
+    private static final String TEST_MESSAGE = "testMessage";
+
+    private String topicName;
+
+    private final int expect = 1;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-netty-http-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() throws IOException {
+        topicName = getTopicForTest(this);
+    }
+
+    @AfterEach
+    public void tearDown() {}
+
+    @Test
+    @Timeout(90)
+    public void testBasicSendReceive() throws Exception {
+
+        ConnectorPropertyFactory connectorPropertyFactory = CamelNettyHTTPPropertyFactory.basic()
+                .withKafkaTopic(topicName)
+                .withReceiveBufferSize(10)
+                .withHost("0.0.0.0")
+                .withPort(HTTP_PORT)
+                .withProtocol("http")
+                .withCamelTypeConverterTransformTo("java.lang.String");
+
+        runTestBlocking(connectorPropertyFactory, topicName, expect);
+    }
+
+    @Override
+    protected void produceTestData() {
+        int retriesLeft = 10;
+        boolean success = false;
+        while(retriesLeft > 0 && !success) {
+            try (final CloseableHttpClient httpclient = HttpClients.createDefault()) {
+
+                byte[] ipAddr = new byte[]{127, 0, 0, 1};
+                InetAddress localhost = InetAddress.getByAddress(ipAddr);
+                final HttpPost httpPost = new HttpPost("http://" + localhost.getHostAddress() + ":" + HTTP_PORT);
+
+                LOG.info("Executing request {} {}", httpPost.getMethod(), httpPost.getURI());
+
+                httpPost.setEntity(new StringEntity(TEST_MESSAGE));
+
+                CloseableHttpResponse response = httpclient.execute(httpPost);
+                assertEquals(200, response.getStatusLine().getStatusCode());
+                response.close();
+                httpPost.releaseConnection();
+                success = true;
+                LOG.info("Request success at {} attempt.", retriesLeft);
+            } catch (IOException e) {
+                if(retriesLeft == 1) {
+                    e.printStackTrace();
+                    fail("There should be no exceptions in sending the http test message.");
+                } else {
+                    retriesLeft--;
+                    try {
+                        Thread.sleep(100);
+                    } catch (InterruptedException interruptedException) {
+                        interruptedException.printStackTrace();
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
+        assertEquals(expect, received, "Didn't process the expected amount of messages");
+        assertEquals(TEST_MESSAGE, consumer.consumedMessages().get(0).value().toString());
+    }
+}
diff --git a/tests/pom.xml b/tests/pom.xml
index 37f2cf0..c735d3b 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -55,6 +55,7 @@
         <module>itests-salesforce</module>
         <module>itests-hdfs</module>
         <module>itests-mongodb</module>
+        <module>itests-netty-http</module>
         <module>itests-jdbc</module>
         <module>itests-azure-common</module>
         <module>itests-azure-storage-blob</module>
@@ -63,7 +64,7 @@
         <module>itests-rabbitmq</module>
         <module>itests-couchbase</module>
         <module>itests-ssh</module>
-        <module>itests-sql</module>
+g        <module>itests-sql</module>
         <module>itests-netty-http</module>
     </modules>