You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2021/03/12 17:58:07 UTC
[camel-kafka-connector] 03/12: fix #969 : Convert
NettyChannelBufferStreamCache from NettyHttpSource not converted to string.
This is an automated email from the ASF dual-hosted git repository.
valdar pushed a commit to branch camel-kafka-connector-0.7.x
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 9825019ca3b70670c0c6ccd46baab12b71af32c8
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Fri Mar 5 22:17:03 2021 +0100
fix #969 : Convert NettyChannelBufferStreamCache from NettyHttpSource not converted to string.
---
tests/itests-netty-http/pom.xml | 9 ++
.../source/CamelNettyHTTPPropertyFactory.java | 60 ++++++++++
.../source/CamelSourceNettyHTTPITCase.java | 123 +++++++++++++++++++++
tests/pom.xml | 3 +-
4 files changed, 194 insertions(+), 1 deletion(-)
diff --git a/tests/itests-netty-http/pom.xml b/tests/itests-netty-http/pom.xml
index 16238cf..353c75e 100644
--- a/tests/itests-netty-http/pom.xml
+++ b/tests/itests-netty-http/pom.xml
@@ -37,6 +37,15 @@
<scope>test</scope>
</dependency>
+ <!-- test infra -->
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test-infra-common</artifactId>
+ <version>${camel.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-netty-http</artifactId>
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyHTTPPropertyFactory.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyHTTPPropertyFactory.java
new file mode 100644
index 0000000..e4df820
--- /dev/null
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyHTTPPropertyFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.nettyhttp.source;
+
+import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
+
+final class CamelNettyHTTPPropertyFactory extends SourceConnectorPropertyFactory<CamelNettyHTTPPropertyFactory> {
+ private CamelNettyHTTPPropertyFactory() {
+
+ }
+
+ public CamelNettyHTTPPropertyFactory withHost(String host) {
+ return setProperty("camel.source.path.host", host);
+ }
+
+ public CamelNettyHTTPPropertyFactory withProtocol(String protocol) {
+ return setProperty("camel.source.path.protocol", protocol);
+ }
+
+ public CamelNettyHTTPPropertyFactory withPort(int port) {
+ return setProperty("camel.source.path.port", String.valueOf(port));
+ }
+
+ public CamelNettyHTTPPropertyFactory withSync(boolean sync) {
+ return setProperty("camel.source.endpoint.sync", String.valueOf(sync));
+ }
+
+ public CamelNettyHTTPPropertyFactory withReceiveBufferSize(int size) {
+ return setProperty("camel.source.endpoint.receiveBufferSize", String.valueOf(size));
+ }
+
+ public CamelNettyHTTPPropertyFactory withCamelTypeConverterTransformTo(String targetClass) {
+ setProperty("transforms", "cameltypeconverter");
+ setProperty("transforms.cameltypeconverter.type", "org.apache.camel.kafkaconnector.transforms.CamelTypeConverterTransform$Value");
+ return setProperty("transforms.cameltypeconverter.target.type", targetClass);
+ }
+
+ public static CamelNettyHTTPPropertyFactory basic() {
+ return new CamelNettyHTTPPropertyFactory()
+ .withTasksMax(1)
+ .withName("CamelNettyHttpSourceConnector")
+ .withConnectorClass("org.apache.camel.kafkaconnector.nettyhttp.CamelNettyhttpSourceConnector")
+ .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+ .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+ }
+}
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
new file mode 100644
index 0000000..41cb6e1
--- /dev/null
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.nettyhttp.source;
+
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
+import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(CamelSourceNettyHTTPITCase.class);
+ private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost");
+ private static final String TEST_MESSAGE = "testMessage";
+
+ private String topicName;
+
+ private final int expect = 1;
+
+ @Override
+ protected String[] getConnectorsInTest() {
+ return new String[] {"camel-netty-http-kafka-connector"};
+ }
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ topicName = getTopicForTest(this);
+ }
+
+ @AfterEach
+ public void tearDown() {}
+
+ @Test
+ @Timeout(90)
+ public void testBasicSendReceive() throws Exception {
+
+ ConnectorPropertyFactory connectorPropertyFactory = CamelNettyHTTPPropertyFactory.basic()
+ .withKafkaTopic(topicName)
+ .withReceiveBufferSize(10)
+ .withHost("0.0.0.0")
+ .withPort(HTTP_PORT)
+ .withProtocol("http")
+ .withCamelTypeConverterTransformTo("java.lang.String");
+
+ runTestBlocking(connectorPropertyFactory, topicName, expect);
+ }
+
+ @Override
+ protected void produceTestData() {
+ int retriesLeft = 10;
+ boolean success = false;
+ while(retriesLeft > 0 && !success) {
+ try (final CloseableHttpClient httpclient = HttpClients.createDefault()) {
+
+ byte[] ipAddr = new byte[]{127, 0, 0, 1};
+ InetAddress localhost = InetAddress.getByAddress(ipAddr);
+ final HttpPost httpPost = new HttpPost("http://" + localhost.getHostAddress() + ":" + HTTP_PORT);
+
+ LOG.info("Executing request {} {}", httpPost.getMethod(), httpPost.getURI());
+
+ httpPost.setEntity(new StringEntity(TEST_MESSAGE));
+
+ CloseableHttpResponse response = httpclient.execute(httpPost);
+ assertEquals(200, response.getStatusLine().getStatusCode());
+ response.close();
+ httpPost.releaseConnection();
+ success = true;
+ LOG.info("Request success at {} attempt.", retriesLeft);
+ } catch (IOException e) {
+ if(retriesLeft == 1) {
+ e.printStackTrace();
+ fail("There should be no exceptions in sending the http test message.");
+ } else {
+ retriesLeft--;
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException interruptedException) {
+ interruptedException.printStackTrace();
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void verifyMessages(TestMessageConsumer<?> consumer) {
+ int received = consumer.consumedMessages().size();
+ assertEquals(expect, received, "Didn't process the expected amount of messages");
+ assertEquals(TEST_MESSAGE, consumer.consumedMessages().get(0).value().toString());
+ }
+}
diff --git a/tests/pom.xml b/tests/pom.xml
index 37f2cf0..c735d3b 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -55,6 +55,7 @@
<module>itests-salesforce</module>
<module>itests-hdfs</module>
<module>itests-mongodb</module>
+ <module>itests-netty-http</module>
<module>itests-jdbc</module>
<module>itests-azure-common</module>
<module>itests-azure-storage-blob</module>
@@ -63,7 +64,7 @@
<module>itests-rabbitmq</module>
<module>itests-couchbase</module>
<module>itests-ssh</module>
- <module>itests-sql</module>
+g <module>itests-sql</module>
<module>itests-netty-http</module>
</modules>