You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/03/07 13:45:33 UTC

[camel-kafka-connector] branch master updated (744e903 -> 98f6ffb)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


    from 744e903  Updated CHANGELOG.md
     new 42456d6  Fixed flaky hdfs itest.
     new 2a1a507  Properly handling UnitOfWork by compelting it at the last possible moment, properly fix #202
     new 63ef1f0  fix #969 : Convert NettyChannelBufferStreamCache from NettyHttpSource not converted to string.
     new f6a3e04  Fixed itest for netty-http.
     new 98f6ffb  chore: fix checkstyle.

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 core/pom.xml                                       |  19 +++-
 .../kafkaconnector/CamelSourceConnectorConfig.java |   5 +
 .../camel/kafkaconnector/CamelSourceRecord.java    |  59 ++++++++++
 .../camel/kafkaconnector/CamelSourceTask.java      |  82 +++++++++++---
 .../camel/kafkaconnector/CamelSourceTaskTest.java  |  48 +++++++++
 .../CamelTypeConverterTransformTest.java           |  24 +++++
 parent/pom.xml                                     |   8 +-
 .../hdfs/sink/CamelSinkHDFSITCase.java             |  19 ++--
 .../camel/kafkaconnector/hdfs/utils/HDFSEasy.java  |  26 ++++-
 tests/itests-netty-http/pom.xml                    |  14 +++
 .../nettyhttp/sink/CamelSinkNettyhttpITCase.java   |   5 +-
 .../source/CamelNettyHTTPPropertyFactory.java      |  60 +++++++++++
 .../source/CamelNettyhttpPropertyFactory.java      |  63 -----------
 .../source/CamelSourceNettyHTTPITCase.java         | 119 +++++++++++++++++++++
 .../source/CamelSourceNettyhttpITCase.java         | 109 -------------------
 15 files changed, 459 insertions(+), 201 deletions(-)
 create mode 100644 core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceRecord.java
 create mode 100644 tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyHTTPPropertyFactory.java
 delete mode 100644 tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyhttpPropertyFactory.java
 create mode 100644 tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
 delete mode 100644 tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyhttpITCase.java


[camel-kafka-connector] 05/05: chore: fix checkstyle.

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 98f6ffb68cc7a339ce6c6fd19d5be4c0f77f1d3c
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Sat Mar 6 15:46:21 2021 +0100

    chore: fix checkstyle.
---
 .../camel/kafkaconnector/CamelSourceRecord.java    | 22 +++++++++++++++++++---
 .../camel/kafkaconnector/CamelSourceTask.java      | 20 +++++++++++---------
 .../camel/kafkaconnector/CamelSourceTaskTest.java  |  2 --
 .../CamelTypeConverterTransformTest.java           |  1 -
 .../hdfs/sink/CamelSinkHDFSITCase.java             | 13 ++++++-------
 .../source/CamelSourceNettyHTTPITCase.java         | 14 +++++---------
 6 files changed, 41 insertions(+), 31 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceRecord.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceRecord.java
index 87934ef..5d03b89 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceRecord.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceRecord.java
@@ -1,13 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.camel.kafkaconnector;
 
+import java.util.Map;
+
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.source.SourceRecord;
 
-import java.util.Map;
-
 public class CamelSourceRecord extends SourceRecord {
-    private Integer claimCheck = null;
+    private Integer claimCheck;
 
     public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Integer partition, Schema valueSchema, Object value) {
         super(sourcePartition, sourceOffset, topic, partition, valueSchema, value);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 03d0c1a..51b055d 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -16,6 +16,15 @@
  */
 package org.apache.camel.kafkaconnector;
 
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedExchange;
@@ -38,14 +47,7 @@ import org.jctools.queues.SpscArrayQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
+
 
 public class CamelSourceTask extends SourceTask {
     public static final String HEADER_CAMEL_PREFIX = "CamelHeader.";
@@ -128,7 +130,7 @@ public class CamelSourceTask extends SourceTask {
 
             freeSlots = new SpscArrayQueue<>(maxNotCommittedRecords);
             freeSlots.fill(new MessagePassingQueue.Supplier<Integer>() {
-                int i = 0;
+                int i;
                 @Override
                 public Integer get() {
                     return i++;
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 51b4db3..5c99ad0 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.kafkaconnector;
 
-import java.awt.print.PrinterJob;
 import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Date;
@@ -28,7 +27,6 @@ import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.kafkaconnector.utils.StringJoinerAggregator;
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
index c6cecbf..6da72c2 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.kafkaconnector.transforms;
 
-import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.Collections;
 import java.util.HashMap;
diff --git a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
index 55cf21f..a111fdc 100644
--- a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
+++ b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
@@ -14,9 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.kafkaconnector.hdfs.sink;
 
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
@@ -36,11 +40,6 @@ import org.junit.runners.model.InitializationError;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -85,7 +84,7 @@ public class CamelSinkHDFSITCase extends CamelSinkTestSupport {
         boolean hdfsServiceCorrectlyStarted = TestUtils.waitFor(() -> hdfsEasy.createFile(new Path(currentBasePath, "initTest"), "test")
                                                                         &&  hdfsEasy.delete(new Path(currentBasePath, "initTest")));
 
-        if(hdfsServiceCorrectlyStarted) {
+        if (hdfsServiceCorrectlyStarted) {
             if (!hdfsEasy.delete(currentBasePath)) {
                 // This is OK: directory may not exist on the path
                 LOG.debug("The directory at {} was not removed", currentBasePath.getName());
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
index 48bcb59..0174eb1 100644
--- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.kafkaconnector.nettyhttp.source;
 
+import java.io.IOException;
+import java.net.InetAddress;
+
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
 import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
@@ -25,7 +28,6 @@ import org.apache.http.client.methods.HttpPost;
 import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
@@ -33,9 +35,6 @@ import org.junit.jupiter.api.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.net.InetAddress;
-
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
@@ -59,9 +58,6 @@ public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport {
         topicName = getTopicForTest(this);
     }
 
-    @AfterEach
-    public void tearDown() {}
-
     @Test
     @Timeout(90)
     public void testBasicSendReceive() throws Exception {
@@ -81,7 +77,7 @@ public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport {
     protected void produceTestData() {
         int retriesLeft = 10;
         boolean success = false;
-        while(retriesLeft > 0 && !success) {
+        while (retriesLeft > 0 && !success) {
             try (final CloseableHttpClient httpclient = HttpClients.createDefault()) {
 
                 byte[] ipAddr = new byte[]{127, 0, 0, 1};
@@ -99,7 +95,7 @@ public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport {
                 success = true;
                 LOG.info("Request success at {} attempt.", retriesLeft);
             } catch (IOException e) {
-                if(retriesLeft == 1) {
+                if (retriesLeft == 1) {
                     e.printStackTrace();
                     fail("There should be no exceptions in sending the http test message.");
                 } else {


[camel-kafka-connector] 01/05: Fixed flaky hdfs itest.

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 42456d6a414b3f94c4956723e1b9f6d0a0e6f498
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Fri Mar 5 22:12:27 2021 +0100

    Fixed flaky hdfs itest.
---
 .../hdfs/sink/CamelSinkHDFSITCase.java             |  28 +++--
 .../camel/kafkaconnector/hdfs/utils/HDFSEasy.java  |  26 ++++-
 tests/itests-netty-http/pom.xml                    |   5 +
 .../surce/CamelNettyHTTPPropertyFactory.java       |  61 ++++++++++
 .../surce/CamelSourceNettyHTTPITCase.java          | 123 +++++++++++++++++++++
 5 files changed, 231 insertions(+), 12 deletions(-)

diff --git a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
index f12f310..55cf21f 100644
--- a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
+++ b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
@@ -17,11 +17,6 @@
 
 package org.apache.camel.kafkaconnector.hdfs.sink;
 
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
@@ -37,9 +32,15 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.runners.model.InitializationError;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -74,16 +75,23 @@ public class CamelSinkHDFSITCase extends CamelSinkTestSupport {
     }
 
     @BeforeEach
-    public void setUp() throws IOException, URISyntaxException {
+    public void setUp() throws IOException, URISyntaxException, InitializationError {
         topicName = getTopicForTest(this);
         hdfsEasy = new HDFSEasy(hdfsService.getHDFSHost(), hdfsService.getPort());
 
         String currentPath = "/test" + TestUtils.randomWithRange(0, 256) + "/";
         currentBasePath = new Path(currentPath);
 
-        if (!hdfsEasy.delete(currentBasePath)) {
-            // This is OK: directory may not exist on the path
-            LOG.debug("The directory at {} was not removed", currentBasePath.getName());
+        boolean hdfsServiceCorrectlyStarted = TestUtils.waitFor(() -> hdfsEasy.createFile(new Path(currentBasePath, "initTest"), "test")
+                                                                        &&  hdfsEasy.delete(new Path(currentBasePath, "initTest")));
+
+        if(hdfsServiceCorrectlyStarted) {
+            if (!hdfsEasy.delete(currentBasePath)) {
+                // This is OK: directory may not exist on the path
+                LOG.debug("The directory at {} was not removed", currentBasePath.getName());
+            }
+        } else {
+            throw new InitializationError("HDFS Service didn't start properly.");
         }
     }
 
@@ -136,7 +144,7 @@ public class CamelSinkHDFSITCase extends CamelSinkTestSupport {
 
             LOG.debug("Retrieved file {} with contents: {}", f.getPath(), contents);
             boolean contains = contents.contains(matchString);
-            assertTrue(contains, "Unexpected content for the remote file " + f.getPath().getName());
+            assertTrue(contains, "Unexpected content for the remote file " + f.getPath().getName() + " content: [" + contents + "] should contain [" + matchString + "]");
         } catch (IOException e) {
             LOG.debug("Reading returned file {} failed: {}", f.getPath(), e.getMessage());
             fail("I/O error: " + e.getMessage());
diff --git a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java
index 7733fe8..4e95191 100644
--- a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java
+++ b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java
@@ -26,6 +26,7 @@ import java.util.Scanner;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
@@ -110,8 +111,7 @@ public class HDFSEasy {
         try {
             return countFiles(path) >= minFiles;
         } catch (Exception e) {
-            LOG.warn("I/O exception while checking if file {} exists", path.getName());
-
+            LOG.warn("I/O exception: {} due to {} while checking if file {} exists", e.getMessage(), e.getCause(), path.getName());
             return false;
         }
     }
@@ -133,4 +133,26 @@ public class HDFSEasy {
             return false;
         }
     }
+
+    public boolean createFile(Path filePath, String content) {
+        FSDataOutputStream streamWriter = null;
+        try {
+            streamWriter = dfs.create(filePath);
+            streamWriter.writeBytes(content);
+            streamWriter.flush();
+        } catch (IOException e) {
+            LOG.debug("Error in file creation: " + e.getMessage());
+            return false;
+        } finally {
+            if (streamWriter != null) {
+                try {
+                    streamWriter.close();
+                } catch (IOException e) {
+                    LOG.debug("Error in file creation during stream close: " + e.getMessage());
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
 }
diff --git a/tests/itests-netty-http/pom.xml b/tests/itests-netty-http/pom.xml
index 16238cf..ddb8bf7 100644
--- a/tests/itests-netty-http/pom.xml
+++ b/tests/itests-netty-http/pom.xml
@@ -52,5 +52,10 @@
             <artifactId>mockwebserver</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelNettyHTTPPropertyFactory.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelNettyHTTPPropertyFactory.java
new file mode 100644
index 0000000..1562328
--- /dev/null
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelNettyHTTPPropertyFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.nettyhttp.surce;
+
+import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
+
+final class CamelNettyHTTPPropertyFactory extends SourceConnectorPropertyFactory<CamelNettyHTTPPropertyFactory> {
+    private CamelNettyHTTPPropertyFactory() {
+
+    }
+
+    public CamelNettyHTTPPropertyFactory withHost(String host) {
+        return setProperty("camel.source.path.host", host);
+    }
+
+    public CamelNettyHTTPPropertyFactory withProtocol(String protocol) {
+        return setProperty("camel.source.path.protocol", protocol);
+    }
+
+    public CamelNettyHTTPPropertyFactory withPort(int port) {
+        return setProperty("camel.source.path.port", String.valueOf(port));
+    }
+
+    public CamelNettyHTTPPropertyFactory withSync(boolean sync) {
+        return setProperty("camel.source.endpoint.sync", String.valueOf(sync));
+    }
+
+    public CamelNettyHTTPPropertyFactory withReceiveBufferSize(int size) {
+        return setProperty("camel.source.endpoint.receiveBufferSize", String.valueOf(size));
+    }
+
+    public CamelNettyHTTPPropertyFactory withCamelTypeConverterTransformTo(String targetClass) {
+        setProperty("transforms", "cameltypeconverter");
+        setProperty("transforms.cameltypeconverter.type", "org.apache.camel.kafkaconnector.transforms.CamelTypeConverterTransform$Value");
+        return setProperty("transforms.cameltypeconverter.target.type", targetClass);
+    }
+
+    public static CamelNettyHTTPPropertyFactory basic() {
+        return new CamelNettyHTTPPropertyFactory()
+                .withTasksMax(1)
+                .withName("CamelNettyHttpSourceConnector")
+                .withConnectorClass("org.apache.camel.kafkaconnector.nettyhttp.CamelNettyhttpSourceConnector")
+                .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+    }
+}
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelSourceNettyHTTPITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelSourceNettyHTTPITCase.java
new file mode 100644
index 0000000..e7e6468
--- /dev/null
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelSourceNettyHTTPITCase.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.nettyhttp.surce;
+
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
+import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceNettyHTTPITCase.class);
+    private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost");
+    private static final String TEST_MESSAGE = "testMessage";
+
+    private String topicName;
+
+    private final int expect = 1;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-netty-http-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() throws IOException {
+        topicName = getTopicForTest(this);
+    }
+
+    @AfterEach
+    public void tearDown() {}
+
+    @Test
+    @Timeout(90)
+    public void testBasicSendReceive() throws Exception {
+
+        ConnectorPropertyFactory connectorPropertyFactory = CamelNettyHTTPPropertyFactory.basic()
+                .withKafkaTopic(topicName)
+                .withReceiveBufferSize(10)
+                .withHost("0.0.0.0")
+                .withPort(HTTP_PORT)
+                .withProtocol("http")
+                .withCamelTypeConverterTransformTo("java.lang.String");
+
+        runTestBlocking(connectorPropertyFactory, topicName, expect);
+    }
+
+    @Override
+    protected void produceTestData() {
+        int retriesLeft = 10;
+        boolean success = false;
+        while(retriesLeft > 0 && !success) {
+            try (final CloseableHttpClient httpclient = HttpClients.createDefault()) {
+
+                byte[] ipAddr = new byte[]{127, 0, 0, 1};
+                InetAddress localhost = InetAddress.getByAddress(ipAddr);
+                final HttpPost httpPost = new HttpPost("http://" + localhost.getHostAddress() + ":" + HTTP_PORT);
+
+                LOG.info("Executing request {} {}", httpPost.getMethod(), httpPost.getURI());
+
+                httpPost.setEntity(new StringEntity(TEST_MESSAGE));
+
+                CloseableHttpResponse response = httpclient.execute(httpPost);
+                assertEquals(200, response.getStatusLine().getStatusCode());
+                response.close();
+                httpPost.releaseConnection();
+                success = true;
+                LOG.info("Request success at {} attempt.", retriesLeft);
+            } catch (IOException e) {
+                if(retriesLeft == 1) {
+                    e.printStackTrace();
+                    fail("There should be no exceptions in sending the http test message.");
+                } else {
+                    retriesLeft--;
+                    try {
+                        Thread.sleep(100);
+                    } catch (InterruptedException interruptedException) {
+                        interruptedException.printStackTrace();
+                    }
+                }
+            }
+        }
+    }
+
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
+        assertEquals(expect, received, "Didn't process the expected amount of messages");
+        assertEquals(TEST_MESSAGE, consumer.consumedMessages().get(0).value().toString());
+    }
+}


[camel-kafka-connector] 04/05: Fixed itest for netty-http.

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit f6a3e04983ecd03cfd6a1bba3423425f00f02ec5
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Sat Mar 6 02:44:30 2021 +0100

    Fixed itest for netty-http.
---
 .../nettyhttp/sink/CamelSinkNettyhttpITCase.java   |   5 +-
 .../source/CamelNettyhttpPropertyFactory.java      |  63 ------------
 .../source/CamelSourceNettyHTTPITCase.java         |   2 +-
 .../source/CamelSourceNettyhttpITCase.java         | 109 ---------------------
 tests/pom.xml                                      |   1 -
 5 files changed, 4 insertions(+), 176 deletions(-)

diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java
index cdf8b2c..96bd27a 100644
--- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java
@@ -20,6 +20,7 @@ package org.apache.camel.kafkaconnector.nettyhttp.sink;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import okhttp3.mockwebserver.MockResponse;
 import okhttp3.mockwebserver.MockWebServer;
 import okhttp3.mockwebserver.RecordedRequest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
@@ -94,7 +95,7 @@ public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport {
                 .withHost(mockServer.getHostName())
                 .withPort(mockServer.getPort())
                 .withPath("test");
-
+        mockServer.enqueue(new MockResponse().setResponseCode(200));
         runTest(connectorPropertyFactory, topicName, expect);
     }
 
@@ -105,7 +106,7 @@ public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport {
                 .withTopics(topicName)
                 .withUrl("http", mockServer.getHostName(), mockServer.getPort(), "test")
                 .buildUrl();
-
+        mockServer.enqueue(new MockResponse().setResponseCode(200));
         runTest(connectorPropertyFactory, topicName, expect);
     }
 }
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyhttpPropertyFactory.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyhttpPropertyFactory.java
deleted file mode 100644
index d97340f..0000000
--- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyhttpPropertyFactory.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.nettyhttp.source;
-
-import org.apache.camel.LoggingLevel;
-import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
-import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
-
-final class CamelNettyhttpPropertyFactory extends SourceConnectorPropertyFactory<CamelNettyhttpPropertyFactory> {
-
-    private CamelNettyhttpPropertyFactory() {
-    }
-
-    public CamelNettyhttpPropertyFactory withProtocol(String value) {
-        return setProperty("camel.source.path.protocol", value);
-    }
-
-    public CamelNettyhttpPropertyFactory withHost(String value) {
-        return setProperty("camel.source.path.host", value);
-    }
-
-    public CamelNettyhttpPropertyFactory withPort(int value) {
-        return setProperty("camel.source.path.port", value);
-    }
-
-    public CamelNettyhttpPropertyFactory withPath(String value) {
-        return setProperty("camel.source.path.path", value);
-    }
-
-    public EndpointUrlBuilder<CamelNettyhttpPropertyFactory> withUrl(String protocol, String host, int port, String path) {
-        String url = String.format("netty-http:%s://%s:%s/%s", protocol, host, port, path);
-        return new EndpointUrlBuilder<>(this::withSourceUrl, url);
-    }
-
-    public static CamelNettyhttpPropertyFactory basic() {
-        return new CamelNettyhttpPropertyFactory()
-                .withName("CamelNettyhttpSourceConnector")
-                .withTasksMax(1)
-                .withConnectorClass("org.apache.camel.kafkaconnector.nettyhttp.CamelNettyhttpSourceConnector")
-                .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
-                .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter")
-                .withTransformsConfig("tostring")
-                .withEntry("type", "org.apache.camel.kafkaconnector.transforms.CamelTypeConverterTransform$Value")
-                .withEntry("target.type", "java.lang.String")
-                .end()
-                .withSourceContentLogginglevel(LoggingLevel.DEBUG);
-    }
-}
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
index 41cb6e1..48bcb59 100644
--- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
@@ -42,7 +42,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport {
     private static final Logger LOG = LoggerFactory.getLogger(CamelSourceNettyHTTPITCase.class);
-    private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost");
+    private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost", 30000, 40000);
     private static final String TEST_MESSAGE = "testMessage";
 
     private String topicName;
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyhttpITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyhttpITCase.java
deleted file mode 100644
index e1c28de..0000000
--- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyhttpITCase.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.nettyhttp.source;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-
-import okhttp3.MediaType;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import okhttp3.RequestBody;
-import okhttp3.Response;
-import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
-import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
-import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.fail;
-
-@Disabled("Convert NettyChannelBufferStreamCache from NettyHttpSource not converted to string #969")
-public class CamelSourceNettyhttpITCase extends CamelSourceTestSupport {
-    private final String host = NetworkUtils.getHostname();
-    private final int port = NetworkUtils.getFreePort();
-
-    private final int expect = 1;
-    private String topicName;
-
-    @Override
-    protected String[] getConnectorsInTest() {
-        return new String[] {"camel-netty-http-kafka-connector"};
-    }
-
-    @BeforeEach
-    public void setUp() {
-        topicName = getTopicForTest(this);
-    }
-
-    @Override
-    protected void produceTestData() {
-        TestUtils.waitFor(() -> NetworkUtils.portIsOpen(host, port));
-        sendMessage();
-    }
-
-    void sendMessage() {
-        OkHttpClient client = new OkHttpClient();
-        RequestBody body = RequestBody.create(MediaType.get("text/plain; charset=utf-8"), "Hello CKC!");
-        Request request = new Request.Builder()
-                .url("http://" + host + ":" + port + "/test")
-                .post(body)
-                .build();
-        try (Response response = client.newCall(request).execute()) {
-            assertEquals(200, response.code(), "Source endpoint didn't return 200");
-        } catch (IOException e) {
-            fail(e.getMessage(), e);
-        }
-    }
-
-    @Override
-    protected void verifyMessages(TestMessageConsumer<?> consumer) {
-        int received = consumer.consumedMessages().size();
-        String receivedObject = (String) consumer.consumedMessages().get(0).value();
-        assertEquals(expect, received, "Did not receive as many messages as expected");
-        assertEquals("Hello CKC!", receivedObject, "Received message content differed");
-    }
-
-    @Test
-    @Timeout(30)
-    public void testLaunchConnector() throws ExecutionException, InterruptedException {
-        CamelNettyhttpPropertyFactory connectorPropertyFactory = CamelNettyhttpPropertyFactory.basic()
-                .withKafkaTopic(topicName)
-                .withProtocol("http")
-                .withHost(host)
-                .withPort(port)
-                .withPath("test");
-
-        runTest(connectorPropertyFactory, topicName, expect);
-    }
-
-    @Test
-    @Timeout(30)
-    public void testLaunchConnectorUsingUrl() throws ExecutionException, InterruptedException {
-        CamelNettyhttpPropertyFactory connectorPropertyFactory = CamelNettyhttpPropertyFactory.basic()
-                .withKafkaTopic(topicName)
-                .withUrl("http", host, port, "test")
-                .buildUrl();
-
-        runTest(connectorPropertyFactory, topicName, expect);
-    }
-}
diff --git a/tests/pom.xml b/tests/pom.xml
index 16eeb17..0c3ff80 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -54,7 +54,6 @@
         <module>itests-salesforce</module>
         <module>itests-hdfs</module>
         <module>itests-mongodb</module>
-        <module>itests-netty-http</module>
         <module>itests-jdbc</module>
         <module>itests-azure-storage-blob</module>
         <module>itests-azure-storage-queue</module>


[camel-kafka-connector] 02/05: Properly handling UnitOfWork by compelting it at the last possible moment, properly fix #202

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 2a1a50760146ef13a52ce0b622cedf39e04b7c34
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Fri Mar 5 22:15:53 2021 +0100

    Properly handling UnitOfWork by compelting it at the last possible moment, properly fix #202
---
 core/pom.xml                                       | 19 ++++-
 .../kafkaconnector/CamelSourceConnectorConfig.java |  5 ++
 .../camel/kafkaconnector/CamelSourceRecord.java    | 43 ++++++++++
 .../camel/kafkaconnector/CamelSourceTask.java      | 96 +++++++++++++++++-----
 .../camel/kafkaconnector/CamelSourceTaskTest.java  | 50 +++++++++++
 .../CamelTypeConverterTransformTest.java           | 25 ++++++
 parent/pom.xml                                     |  8 +-
 7 files changed, 219 insertions(+), 27 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index 2e32d16..f59f70d 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -49,6 +49,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
+            <artifactId>camel-seda</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
             <artifactId>camel-kafka</artifactId>
         </dependency>
         <dependency>
@@ -56,6 +60,13 @@
             <artifactId>camel-core-languages</artifactId>
         </dependency>
 
+        <!-- Tools -->
+        <dependency>
+            <groupId>org.jctools</groupId>
+            <artifactId>jctools-core</artifactId>
+            <version>${version.jctools}</version>
+        </dependency>
+
         <!-- Kafka -->
         <dependency>
             <groupId>org.apache.kafka</groupId>
@@ -108,22 +119,22 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
-            <artifactId>camel-seda</artifactId>
+            <artifactId>camel-timer</artifactId>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
-            <artifactId>camel-timer</artifactId>
+            <artifactId>camel-log</artifactId>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
-            <artifactId>camel-log</artifactId>
+            <artifactId>camel-slack</artifactId>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
-            <artifactId>camel-slack</artifactId>
+            <artifactId>camel-netty-http</artifactId>
             <scope>test</scope>
         </dependency>
 
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index bb4f8f8..4acfa62 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -54,6 +54,10 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig {
     public static final String CAMEL_SOURCE_MAX_POLL_DURATION_CONF = "camel.source.maxPollDuration";
     public static final String CAMEL_SOURCE_MAX_POLL_DURATION_DOC = "The maximum time in milliseconds spent in a single call to poll()";
 
+    public static final Integer CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_DEFAULT = 1024;
+    public static final String CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_CONF = "camel.source.maxNotCommittedRecords";
+    public static final String CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_DOC = "The maximum number of non committed kafka connect records that can be tolerated before stop polling new records (rounded to the next power of 2) with a minimum of 4.";
+
     public static final Long CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DEFAULT = 1000L;
     public static final String CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF = "camel.source.pollingConsumerQueueSize";
     public static final String CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DOC = "The queue size for the internal hand-off queue between the polling consumer, and producers sending data into the queue.";
@@ -82,6 +86,7 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig {
         .define(TOPIC_CONF, ConfigDef.Type.STRING, TOPIC_DEFAULT, ConfigDef.Importance.HIGH, TOPIC_DOC)
         .define(CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF, Type.LONG, CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DOC)
         .define(CAMEL_SOURCE_MAX_POLL_DURATION_CONF, Type.LONG, CAMEL_SOURCE_MAX_POLL_DURATION_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_POLL_DURATION_DOC)
+        .define(CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_CONF, Type.INT, CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_DOC)
         .define(CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DOC)
         .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DOC)
         .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF, Type.BOOLEAN, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceRecord.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceRecord.java
new file mode 100644
index 0000000..87934ef
--- /dev/null
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceRecord.java
@@ -0,0 +1,43 @@
+package org.apache.camel.kafkaconnector;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Map;
+
+public class CamelSourceRecord extends SourceRecord {
+    private Integer claimCheck = null;
+
+    public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Integer partition, Schema valueSchema, Object value) {
+        super(sourcePartition, sourceOffset, topic, partition, valueSchema, value);
+    }
+
+    public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Schema valueSchema, Object value) {
+        super(sourcePartition, sourceOffset, topic, valueSchema, value);
+    }
+
+    public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Schema keySchema, Object key, Schema valueSchema, Object value) {
+        super(sourcePartition, sourceOffset, topic, keySchema, key, valueSchema, value);
+    }
+
+    public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Integer partition, Schema keySchema, Object key, Schema valueSchema, Object value) {
+        super(sourcePartition, sourceOffset, topic, partition, keySchema, key, valueSchema, value);
+    }
+
+    public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Integer partition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp) {
+        super(sourcePartition, sourceOffset, topic, partition, keySchema, key, valueSchema, value, timestamp);
+    }
+
+    public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Integer partition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp, Iterable<Header> headers) {
+        super(sourcePartition, sourceOffset, topic, partition, keySchema, key, valueSchema, value, timestamp, headers);
+    }
+
+    public Integer getClaimCheck() {
+        return claimCheck;
+    }
+
+    public void setClaimCheck(Integer claimCheck) {
+        this.claimCheck = claimCheck;
+    }
+}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 16e6bfc..03d0c1a 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -16,30 +16,37 @@
  */
 package org.apache.camel.kafkaconnector;
 
-import java.math.BigDecimal;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.PollingConsumer;
+import org.apache.camel.StreamCache;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain;
 import org.apache.camel.kafkaconnector.utils.SchemaHelper;
 import org.apache.camel.kafkaconnector.utils.TaskHelper;
+import org.apache.camel.support.UnitOfWorkHelper;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.SpscArrayQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
 public class CamelSourceTask extends SourceTask {
     public static final String HEADER_CAMEL_PREFIX = "CamelHeader.";
     public static final String PROPERTY_CAMEL_PREFIX = "CamelProperty.";
@@ -49,18 +56,23 @@ public class CamelSourceTask extends SourceTask {
     private static final String CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX = "camel.source.endpoint.";
     private static final String CAMEL_SOURCE_PATH_PROPERTIES_PREFIX = "camel.source.path.";
 
-    private static final String LOCAL_URL = "direct:end";
+    private static final String LOCAL_URL = "seda:end";
 
     private CamelKafkaConnectMain cms;
     private PollingConsumer consumer;
     private String[] topics;
     private Long maxBatchPollSize;
     private Long maxPollDuration;
+    private Integer maxNotCommittedRecords;
     private String camelMessageHeaderKey;
     private LoggingLevel loggingLevel = LoggingLevel.OFF;
+    private Exchange[] exchangesWaitingForAck;
+    //the assumption is that at most 1 thread is running poll() method and at most 1 thread is running commitRecord()
+    private SpscArrayQueue<Integer> freeSlots;
     private boolean mapProperties;
     private boolean mapHeaders;
 
+
     @Override
     public String version() {
         return VersionUtil.getVersion();
@@ -82,6 +94,7 @@ public class CamelSourceTask extends SourceTask {
 
             maxBatchPollSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF);
             maxPollDuration = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_POLL_DURATION_CONF);
+            maxNotCommittedRecords = config.getInt(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_CONF);
 
             camelMessageHeaderKey = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF);
 
@@ -105,10 +118,24 @@ public class CamelSourceTask extends SourceTask {
             final String headersRemovePattern = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF);
             mapProperties = config.getBoolean(CamelSourceConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF);
             mapHeaders = config.getBoolean(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_HEADERS_CONF);
-            
+
             topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(",");
 
-            String localUrl = getLocalUrlWithPollingOptions(config);
+            long pollingConsumerQueueSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF);
+            long pollingConsumerBlockTimeout = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF);
+            boolean pollingConsumerBlockWhenFull = config.getBoolean(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF);
+            String localUrl = getLocalUrlWithPollingOptions(pollingConsumerQueueSize, pollingConsumerBlockTimeout, pollingConsumerBlockWhenFull);
+
+            freeSlots = new SpscArrayQueue<>(maxNotCommittedRecords);
+            freeSlots.fill(new MessagePassingQueue.Supplier<Integer>() {
+                int i = 0;
+                @Override
+                public Integer get() {
+                    return i++;
+                }
+            });
+            //needs to be done like this because freeSlots capacity is rounded to the next power of 2 of maxNotCommittedRecords
+            exchangesWaitingForAck = new Exchange[freeSlots.capacity()];
 
             CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
@@ -155,13 +182,14 @@ public class CamelSourceTask extends SourceTask {
 
     @Override
     public synchronized List<SourceRecord> poll() {
+        LOG.debug("Number of records waiting an ack: {}", freeSlots.capacity() - freeSlots.size());
         final long startPollEpochMilli = Instant.now().toEpochMilli();
 
         long remaining = remaining(startPollEpochMilli, maxPollDuration);
         long collectedRecords = 0L;
 
         List<SourceRecord> records = new ArrayList<>();
-        while (collectedRecords < maxBatchPollSize && remaining > 0) {
+        while (collectedRecords < maxBatchPollSize && freeSlots.size() >= topics.length && remaining > 0) {
             Exchange exchange = consumer.receive(remaining);
             if (exchange == null) {
                 // Nothing received, abort and return what we received so far
@@ -177,31 +205,46 @@ public class CamelSourceTask extends SourceTask {
             Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId());
 
             final Object messageHeaderKey = camelMessageHeaderKey != null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null;
-            final Object messageBodyValue = exchange.getMessage().getBody();
+            Object messageBodyValue = exchange.getMessage().getBody();
 
             final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null;
             final Schema messageBodySchema = messageBodyValue != null ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null;
 
             final long timestamp = calculateTimestamp(exchange);
 
+            // take in account Cached camel streams
+            if (messageBodyValue instanceof StreamCache) {
+                StreamCache sc = (StreamCache) messageBodyValue;
+                // reset to be sure that the cache is ready to be used before sending it in the record (could be useful for SMTs)
+                sc.reset();
+                try {
+                    messageBodyValue = sc.copy(exchange);
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
             for (String singleTopic : topics) {
-                SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, singleTopic, null, messageKeySchema,
+                CamelSourceRecord camelRecord = new CamelSourceRecord(sourcePartition, sourceOffset, singleTopic, null, messageKeySchema,
                         messageHeaderKey, messageBodySchema, messageBodyValue, timestamp);
 
                 if (mapHeaders) {
                     if (exchange.getMessage().hasHeaders()) {
-                        setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
+                        setAdditionalHeaders(camelRecord, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
                     }
                 }
                 
                 if (mapProperties) {
                     if (exchange.hasProperties()) {
-                        setAdditionalHeaders(record, exchange.getProperties(), PROPERTY_CAMEL_PREFIX);
+                        setAdditionalHeaders(camelRecord, exchange.getProperties(), PROPERTY_CAMEL_PREFIX);
                     }
                 }
 
-                TaskHelper.logRecordContent(LOG, loggingLevel, record);
-                records.add(record);
+                TaskHelper.logRecordContent(LOG, loggingLevel, camelRecord);
+                Integer claimCheck = freeSlots.remove();
+                camelRecord.setClaimCheck(claimCheck);
+                exchangesWaitingForAck[claimCheck] = exchange;
+                LOG.debug("Record: {}, containing data from exchange: {}, is associated with claim check number: {}", camelRecord, exchange, claimCheck);
+                records.add(camelRecord);
             }
             collectedRecords++;
             remaining = remaining(startPollEpochMilli, maxPollDuration);
@@ -211,6 +254,18 @@ public class CamelSourceTask extends SourceTask {
     }
 
     @Override
+    public void commitRecord(SourceRecord record, RecordMetadata metadata) throws InterruptedException {
+        ///XXX: this should be a safe cast please see: https://issues.apache.org/jira/browse/KAFKA-12391
+        Integer claimCheck = ((CamelSourceRecord)record).getClaimCheck();
+        LOG.debug("Committing record with claim check number: {}", claimCheck);
+        Exchange correlatedExchange = exchangesWaitingForAck[claimCheck];
+        exchangesWaitingForAck[claimCheck] = null;
+        freeSlots.add(claimCheck);
+        UnitOfWorkHelper.doneSynchronizations(correlatedExchange, correlatedExchange.adapt(ExtendedExchange.class).handoverCompletions(), LOG);
+        LOG.debug("Record with claim check number: {} committed.", claimCheck);
+    }
+
+    @Override
     public void stop() {
         LOG.info("Stopping CamelSourceTask connector task");
         try {
@@ -301,10 +356,7 @@ public class CamelSourceTask extends SourceTask {
         }
     }
 
-    private String getLocalUrlWithPollingOptions(CamelSourceConnectorConfig config) {
-        long pollingConsumerQueueSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF);
-        long pollingConsumerBlockTimeout = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF);
-        boolean pollingConsumerBlockWhenFull = config.getBoolean(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF);
+    private String getLocalUrlWithPollingOptions(long pollingConsumerQueueSize, long pollingConsumerBlockTimeout, boolean pollingConsumerBlockWhenFull) {
         return LOCAL_URL + "?pollingConsumerQueueSize=" + pollingConsumerQueueSize + "&pollingConsumerBlockTimeout=" + pollingConsumerBlockTimeout
                + "&pollingConsumerBlockWhenFull=" + pollingConsumerBlockWhenFull;
     }
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 21d56fc..51b4db3 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -16,15 +16,19 @@
  */
 package org.apache.camel.kafkaconnector;
 
+import java.awt.print.PrinterJob;
 import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.kafkaconnector.utils.StringJoinerAggregator;
@@ -77,6 +81,24 @@ public class CamelSourceTaskTest {
     }
 
     @Test
+    public void testSourcePollingMaxNotCommittedRecords() {
+        final long size = 4;
+        Map<String, String> props = new HashMap<>();
+        props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI);
+        props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_CONF, String.valueOf(size));
+
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(props);
+
+        sendBatchOfRecords(sourceTask, size + 1);
+        List<SourceRecord> poll = sourceTask.poll();
+
+        assertEquals(4, poll.size());
+        sourceTask.stop();
+    }
+
+    @Test
     public void testSourcePollingMaxBatchPollSize() {
         final long size = 2;
         Map<String, String> props = new HashMap<>();
@@ -621,4 +643,32 @@ public class CamelSourceTaskTest {
 
         sourceTask.stop();
     }
+
+    @Test
+    public void testRequestReply() throws InterruptedException {
+        Map<String, String> props = new HashMap<>();
+        props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI);
+
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(props);
+
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+                final ProducerTemplate template = sourceTask.getCms().getProducerTemplate();
+                String result = template.requestBody(DIRECT_URI, "test", String.class);
+                assertEquals("test", result);
+            }
+        });
+
+        List<SourceRecord> poll = sourceTask.poll();
+        assertEquals(1, poll.size());
+
+        sourceTask.commitRecord(poll.get(0), null);
+
+        sourceTask.stop();
+        executor.shutdown();
+    }
 }
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
index 92c668b..c6cecbf 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
@@ -16,10 +16,14 @@
  */
 package org.apache.camel.kafkaconnector.transforms;
 
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import io.netty.buffer.Unpooled;
+import org.apache.camel.component.netty.http.NettyChannelBufferStreamCache;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
@@ -63,6 +67,27 @@ public class CamelTypeConverterTransformTest {
     }
 
     @Test
+    public void testIfItConvertsNettyCorrectly() {
+        final String testMessage = "testMessage";
+        NettyChannelBufferStreamCache nettyTestValue = new NettyChannelBufferStreamCache(Unpooled.wrappedBuffer(testMessage.getBytes(Charset.defaultCharset())));
+
+        final SourceRecord connectRecord = new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "topic", Schema.STRING_SCHEMA, "1234", Schema.BYTES_SCHEMA, nettyTestValue);
+
+        final Map<String, Object> propsForValueSmt = new HashMap<>();
+        propsForValueSmt.put(CamelTypeConverterTransform.FIELD_TARGET_TYPE_CONFIG, "java.lang.String");
+
+        final Transformation<SourceRecord> transformationValue = new CamelTypeConverterTransform.Value<>();
+
+        transformationValue.configure(propsForValueSmt);
+
+        final SourceRecord transformedValueSourceRecord = transformationValue.apply(connectRecord);
+
+        assertEquals(java.lang.String.class, transformedValueSourceRecord.value().getClass());
+        assertEquals(Schema.STRING_SCHEMA, transformedValueSourceRecord.valueSchema());
+        assertEquals(testMessage, transformedValueSourceRecord.value());
+    }
+
+    @Test
     public void testIfHandlesTypeConvertersFromCamelComponents() {
         // we know we have a type converter from struct to map in dbz component, so we use this for testing
         final Schema schema = SchemaBuilder.struct()
diff --git a/parent/pom.xml b/parent/pom.xml
index 85a4fa7..4230a7f 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -35,6 +35,7 @@
         <version.guava>20.0</version.guava>
         <version.javax.annotation-api>1.3.2</version.javax.annotation-api>
         <version.postgres>42.2.14</version.postgres>
+        <version.jctools>3.3.0</version.jctools>
 
         <version.maven.compiler>3.8.1</version.maven.compiler>
         <version.maven.javadoc>3.1.1</version.maven.javadoc>
@@ -57,7 +58,6 @@
         <!-- Note: we are deliberately overriding this one due to GH issue #990 -->
         <testcontainers-version>1.15.2</testcontainers-version>
 
-
         <mycila-license-version>3.0</mycila-license-version>
         <gmavenplus-plugin-version>1.9.0</gmavenplus-plugin-version>
         <groovy-version>3.0.7</groovy-version>
@@ -116,6 +116,12 @@
                 <version>${version.guava}</version>
             </dependency>
 
+            <dependency>
+                <groupId>org.jctools</groupId>
+                <artifactId>jctools-core</artifactId>
+                <version>${version.jctools}</version>
+            </dependency>
+
             <!--  Kafka dependencies -->
             <dependency>
                 <groupId>org.apache.kafka</groupId>


[camel-kafka-connector] 03/05: fix #969 : Convert NettyChannelBufferStreamCache from NettyHttpSource not converted to string.

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 63ef1f08f02fadb0522eba288577405d6f30e026
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Fri Mar 5 22:17:03 2021 +0100

    fix #969 : Convert NettyChannelBufferStreamCache from NettyHttpSource not converted to string.
---
 tests/itests-netty-http/pom.xml                                  | 9 +++++++++
 .../{surce => source}/CamelNettyHTTPPropertyFactory.java         | 3 +--
 .../nettyhttp/{surce => source}/CamelSourceNettyHTTPITCase.java  | 4 ++--
 tests/pom.xml                                                    | 1 +
 4 files changed, 13 insertions(+), 4 deletions(-)

diff --git a/tests/itests-netty-http/pom.xml b/tests/itests-netty-http/pom.xml
index ddb8bf7..88a0929 100644
--- a/tests/itests-netty-http/pom.xml
+++ b/tests/itests-netty-http/pom.xml
@@ -37,6 +37,15 @@
             <scope>test</scope>
         </dependency>
 
+        <!-- test infra -->
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test-infra-common</artifactId>
+            <version>${camel.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-netty-http</artifactId>
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelNettyHTTPPropertyFactory.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyHTTPPropertyFactory.java
similarity index 97%
rename from tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelNettyHTTPPropertyFactory.java
rename to tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyHTTPPropertyFactory.java
index 1562328..e4df820 100644
--- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelNettyHTTPPropertyFactory.java
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyHTTPPropertyFactory.java
@@ -14,8 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.camel.kafkaconnector.nettyhttp.surce;
+package org.apache.camel.kafkaconnector.nettyhttp.source;
 
 import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
 
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelSourceNettyHTTPITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
similarity index 98%
rename from tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelSourceNettyHTTPITCase.java
rename to tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
index e7e6468..41cb6e1 100644
--- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelSourceNettyHTTPITCase.java
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
@@ -14,8 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.camel.kafkaconnector.nettyhttp.surce;
+package org.apache.camel.kafkaconnector.nettyhttp.source;
 
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
@@ -115,6 +114,7 @@ public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport {
         }
     }
 
+    @Override
     protected void verifyMessages(TestMessageConsumer<?> consumer) {
         int received = consumer.consumedMessages().size();
         assertEquals(expect, received, "Didn't process the expected amount of messages");
diff --git a/tests/pom.xml b/tests/pom.xml
index 0c3ff80..16eeb17 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -54,6 +54,7 @@
         <module>itests-salesforce</module>
         <module>itests-hdfs</module>
         <module>itests-mongodb</module>
+        <module>itests-netty-http</module>
         <module>itests-jdbc</module>
         <module>itests-azure-storage-blob</module>
         <module>itests-azure-storage-queue</module>