You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/03/07 13:45:35 UTC

[camel-kafka-connector] 02/05: Properly handling UnitOfWork by compelting it at the last possible moment, properly fix #202

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 2a1a50760146ef13a52ce0b622cedf39e04b7c34
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Fri Mar 5 22:15:53 2021 +0100

    Properly handling UnitOfWork by compelting it at the last possible moment, properly fix #202
---
 core/pom.xml                                       | 19 ++++-
 .../kafkaconnector/CamelSourceConnectorConfig.java |  5 ++
 .../camel/kafkaconnector/CamelSourceRecord.java    | 43 ++++++++++
 .../camel/kafkaconnector/CamelSourceTask.java      | 96 +++++++++++++++++-----
 .../camel/kafkaconnector/CamelSourceTaskTest.java  | 50 +++++++++++
 .../CamelTypeConverterTransformTest.java           | 25 ++++++
 parent/pom.xml                                     |  8 +-
 7 files changed, 219 insertions(+), 27 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index 2e32d16..f59f70d 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -49,6 +49,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
+            <artifactId>camel-seda</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
             <artifactId>camel-kafka</artifactId>
         </dependency>
         <dependency>
@@ -56,6 +60,13 @@
             <artifactId>camel-core-languages</artifactId>
         </dependency>
 
+        <!-- Tools -->
+        <dependency>
+            <groupId>org.jctools</groupId>
+            <artifactId>jctools-core</artifactId>
+            <version>${version.jctools}</version>
+        </dependency>
+
         <!-- Kafka -->
         <dependency>
             <groupId>org.apache.kafka</groupId>
@@ -108,22 +119,22 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
-            <artifactId>camel-seda</artifactId>
+            <artifactId>camel-timer</artifactId>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
-            <artifactId>camel-timer</artifactId>
+            <artifactId>camel-log</artifactId>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
-            <artifactId>camel-log</artifactId>
+            <artifactId>camel-slack</artifactId>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
-            <artifactId>camel-slack</artifactId>
+            <artifactId>camel-netty-http</artifactId>
             <scope>test</scope>
         </dependency>
 
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index bb4f8f8..4acfa62 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -54,6 +54,10 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig {
     public static final String CAMEL_SOURCE_MAX_POLL_DURATION_CONF = "camel.source.maxPollDuration";
     public static final String CAMEL_SOURCE_MAX_POLL_DURATION_DOC = "The maximum time in milliseconds spent in a single call to poll()";
 
+    public static final Integer CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_DEFAULT = 1024;
+    public static final String CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_CONF = "camel.source.maxNotCommittedRecords";
+    public static final String CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_DOC = "The maximum number of non committed kafka connect records that can be tolerated before stop polling new records (rounded to the next power of 2) with a minimum of 4.";
+
     public static final Long CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DEFAULT = 1000L;
     public static final String CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF = "camel.source.pollingConsumerQueueSize";
     public static final String CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DOC = "The queue size for the internal hand-off queue between the polling consumer, and producers sending data into the queue.";
@@ -82,6 +86,7 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig {
         .define(TOPIC_CONF, ConfigDef.Type.STRING, TOPIC_DEFAULT, ConfigDef.Importance.HIGH, TOPIC_DOC)
         .define(CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF, Type.LONG, CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DOC)
         .define(CAMEL_SOURCE_MAX_POLL_DURATION_CONF, Type.LONG, CAMEL_SOURCE_MAX_POLL_DURATION_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_POLL_DURATION_DOC)
+        .define(CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_CONF, Type.INT, CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_DOC)
         .define(CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DOC)
         .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DOC)
         .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF, Type.BOOLEAN, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceRecord.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceRecord.java
new file mode 100644
index 0000000..87934ef
--- /dev/null
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceRecord.java
@@ -0,0 +1,43 @@
+package org.apache.camel.kafkaconnector;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Map;
+
+public class CamelSourceRecord extends SourceRecord {
+    private Integer claimCheck = null;
+
+    public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Integer partition, Schema valueSchema, Object value) {
+        super(sourcePartition, sourceOffset, topic, partition, valueSchema, value);
+    }
+
+    public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Schema valueSchema, Object value) {
+        super(sourcePartition, sourceOffset, topic, valueSchema, value);
+    }
+
+    public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Schema keySchema, Object key, Schema valueSchema, Object value) {
+        super(sourcePartition, sourceOffset, topic, keySchema, key, valueSchema, value);
+    }
+
+    public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Integer partition, Schema keySchema, Object key, Schema valueSchema, Object value) {
+        super(sourcePartition, sourceOffset, topic, partition, keySchema, key, valueSchema, value);
+    }
+
+    public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Integer partition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp) {
+        super(sourcePartition, sourceOffset, topic, partition, keySchema, key, valueSchema, value, timestamp);
+    }
+
+    public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Integer partition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp, Iterable<Header> headers) {
+        super(sourcePartition, sourceOffset, topic, partition, keySchema, key, valueSchema, value, timestamp, headers);
+    }
+
+    public Integer getClaimCheck() {
+        return claimCheck;
+    }
+
+    public void setClaimCheck(Integer claimCheck) {
+        this.claimCheck = claimCheck;
+    }
+}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 16e6bfc..03d0c1a 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -16,30 +16,37 @@
  */
 package org.apache.camel.kafkaconnector;
 
-import java.math.BigDecimal;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.PollingConsumer;
+import org.apache.camel.StreamCache;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain;
 import org.apache.camel.kafkaconnector.utils.SchemaHelper;
 import org.apache.camel.kafkaconnector.utils.TaskHelper;
+import org.apache.camel.support.UnitOfWorkHelper;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.SpscArrayQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
 public class CamelSourceTask extends SourceTask {
     public static final String HEADER_CAMEL_PREFIX = "CamelHeader.";
     public static final String PROPERTY_CAMEL_PREFIX = "CamelProperty.";
@@ -49,18 +56,23 @@ public class CamelSourceTask extends SourceTask {
     private static final String CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX = "camel.source.endpoint.";
     private static final String CAMEL_SOURCE_PATH_PROPERTIES_PREFIX = "camel.source.path.";
 
-    private static final String LOCAL_URL = "direct:end";
+    private static final String LOCAL_URL = "seda:end";
 
     private CamelKafkaConnectMain cms;
     private PollingConsumer consumer;
     private String[] topics;
     private Long maxBatchPollSize;
     private Long maxPollDuration;
+    private Integer maxNotCommittedRecords;
     private String camelMessageHeaderKey;
     private LoggingLevel loggingLevel = LoggingLevel.OFF;
+    private Exchange[] exchangesWaitingForAck;
+    //the assumption is that at most 1 thread is running poll() method and at most 1 thread is running commitRecord()
+    private SpscArrayQueue<Integer> freeSlots;
     private boolean mapProperties;
     private boolean mapHeaders;
 
+
     @Override
     public String version() {
         return VersionUtil.getVersion();
@@ -82,6 +94,7 @@ public class CamelSourceTask extends SourceTask {
 
             maxBatchPollSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF);
             maxPollDuration = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_POLL_DURATION_CONF);
+            maxNotCommittedRecords = config.getInt(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_CONF);
 
             camelMessageHeaderKey = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF);
 
@@ -105,10 +118,24 @@ public class CamelSourceTask extends SourceTask {
             final String headersRemovePattern = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF);
             mapProperties = config.getBoolean(CamelSourceConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF);
             mapHeaders = config.getBoolean(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_HEADERS_CONF);
-            
+
             topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(",");
 
-            String localUrl = getLocalUrlWithPollingOptions(config);
+            long pollingConsumerQueueSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF);
+            long pollingConsumerBlockTimeout = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF);
+            boolean pollingConsumerBlockWhenFull = config.getBoolean(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF);
+            String localUrl = getLocalUrlWithPollingOptions(pollingConsumerQueueSize, pollingConsumerBlockTimeout, pollingConsumerBlockWhenFull);
+
+            freeSlots = new SpscArrayQueue<>(maxNotCommittedRecords);
+            freeSlots.fill(new MessagePassingQueue.Supplier<Integer>() {
+                int i = 0;
+                @Override
+                public Integer get() {
+                    return i++;
+                }
+            });
+            //needs to be done like this because freeSlots capacity is rounded to the next power of 2 of maxNotCommittedRecords
+            exchangesWaitingForAck = new Exchange[freeSlots.capacity()];
 
             CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
@@ -155,13 +182,14 @@ public class CamelSourceTask extends SourceTask {
 
     @Override
     public synchronized List<SourceRecord> poll() {
+        LOG.debug("Number of records waiting an ack: {}", freeSlots.capacity() - freeSlots.size());
         final long startPollEpochMilli = Instant.now().toEpochMilli();
 
         long remaining = remaining(startPollEpochMilli, maxPollDuration);
         long collectedRecords = 0L;
 
         List<SourceRecord> records = new ArrayList<>();
-        while (collectedRecords < maxBatchPollSize && remaining > 0) {
+        while (collectedRecords < maxBatchPollSize && freeSlots.size() >= topics.length && remaining > 0) {
             Exchange exchange = consumer.receive(remaining);
             if (exchange == null) {
                 // Nothing received, abort and return what we received so far
@@ -177,31 +205,46 @@ public class CamelSourceTask extends SourceTask {
             Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId());
 
             final Object messageHeaderKey = camelMessageHeaderKey != null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null;
-            final Object messageBodyValue = exchange.getMessage().getBody();
+            Object messageBodyValue = exchange.getMessage().getBody();
 
             final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null;
             final Schema messageBodySchema = messageBodyValue != null ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null;
 
             final long timestamp = calculateTimestamp(exchange);
 
+            // take in account Cached camel streams
+            if (messageBodyValue instanceof StreamCache) {
+                StreamCache sc = (StreamCache) messageBodyValue;
+                // reset to be sure that the cache is ready to be used before sending it in the record (could be useful for SMTs)
+                sc.reset();
+                try {
+                    messageBodyValue = sc.copy(exchange);
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
             for (String singleTopic : topics) {
-                SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, singleTopic, null, messageKeySchema,
+                CamelSourceRecord camelRecord = new CamelSourceRecord(sourcePartition, sourceOffset, singleTopic, null, messageKeySchema,
                         messageHeaderKey, messageBodySchema, messageBodyValue, timestamp);
 
                 if (mapHeaders) {
                     if (exchange.getMessage().hasHeaders()) {
-                        setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
+                        setAdditionalHeaders(camelRecord, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
                     }
                 }
                 
                 if (mapProperties) {
                     if (exchange.hasProperties()) {
-                        setAdditionalHeaders(record, exchange.getProperties(), PROPERTY_CAMEL_PREFIX);
+                        setAdditionalHeaders(camelRecord, exchange.getProperties(), PROPERTY_CAMEL_PREFIX);
                     }
                 }
 
-                TaskHelper.logRecordContent(LOG, loggingLevel, record);
-                records.add(record);
+                TaskHelper.logRecordContent(LOG, loggingLevel, camelRecord);
+                Integer claimCheck = freeSlots.remove();
+                camelRecord.setClaimCheck(claimCheck);
+                exchangesWaitingForAck[claimCheck] = exchange;
+                LOG.debug("Record: {}, containing data from exchange: {}, is associated with claim check number: {}", camelRecord, exchange, claimCheck);
+                records.add(camelRecord);
             }
             collectedRecords++;
             remaining = remaining(startPollEpochMilli, maxPollDuration);
@@ -211,6 +254,18 @@ public class CamelSourceTask extends SourceTask {
     }
 
     @Override
+    public void commitRecord(SourceRecord record, RecordMetadata metadata) throws InterruptedException {
+        ///XXX: this should be a safe cast please see: https://issues.apache.org/jira/browse/KAFKA-12391
+        Integer claimCheck = ((CamelSourceRecord)record).getClaimCheck();
+        LOG.debug("Committing record with claim check number: {}", claimCheck);
+        Exchange correlatedExchange = exchangesWaitingForAck[claimCheck];
+        exchangesWaitingForAck[claimCheck] = null;
+        freeSlots.add(claimCheck);
+        UnitOfWorkHelper.doneSynchronizations(correlatedExchange, correlatedExchange.adapt(ExtendedExchange.class).handoverCompletions(), LOG);
+        LOG.debug("Record with claim check number: {} committed.", claimCheck);
+    }
+
+    @Override
     public void stop() {
         LOG.info("Stopping CamelSourceTask connector task");
         try {
@@ -301,10 +356,7 @@ public class CamelSourceTask extends SourceTask {
         }
     }
 
-    private String getLocalUrlWithPollingOptions(CamelSourceConnectorConfig config) {
-        long pollingConsumerQueueSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF);
-        long pollingConsumerBlockTimeout = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF);
-        boolean pollingConsumerBlockWhenFull = config.getBoolean(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF);
+    private String getLocalUrlWithPollingOptions(long pollingConsumerQueueSize, long pollingConsumerBlockTimeout, boolean pollingConsumerBlockWhenFull) {
         return LOCAL_URL + "?pollingConsumerQueueSize=" + pollingConsumerQueueSize + "&pollingConsumerBlockTimeout=" + pollingConsumerBlockTimeout
                + "&pollingConsumerBlockWhenFull=" + pollingConsumerBlockWhenFull;
     }
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 21d56fc..51b4db3 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -16,15 +16,19 @@
  */
 package org.apache.camel.kafkaconnector;
 
+import java.awt.print.PrinterJob;
 import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.kafkaconnector.utils.StringJoinerAggregator;
@@ -77,6 +81,24 @@ public class CamelSourceTaskTest {
     }
 
     @Test
+    public void testSourcePollingMaxNotCommittedRecords() {
+        final long size = 4;
+        Map<String, String> props = new HashMap<>();
+        props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI);
+        props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_CONF, String.valueOf(size));
+
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(props);
+
+        sendBatchOfRecords(sourceTask, size + 1);
+        List<SourceRecord> poll = sourceTask.poll();
+
+        assertEquals(4, poll.size());
+        sourceTask.stop();
+    }
+
+    @Test
     public void testSourcePollingMaxBatchPollSize() {
         final long size = 2;
         Map<String, String> props = new HashMap<>();
@@ -621,4 +643,32 @@ public class CamelSourceTaskTest {
 
         sourceTask.stop();
     }
+
+    @Test
+    public void testRequestReply() throws InterruptedException {
+        Map<String, String> props = new HashMap<>();
+        props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI);
+
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(props);
+
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+                final ProducerTemplate template = sourceTask.getCms().getProducerTemplate();
+                String result = template.requestBody(DIRECT_URI, "test", String.class);
+                assertEquals("test", result);
+            }
+        });
+
+        List<SourceRecord> poll = sourceTask.poll();
+        assertEquals(1, poll.size());
+
+        sourceTask.commitRecord(poll.get(0), null);
+
+        sourceTask.stop();
+        executor.shutdown();
+    }
 }
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
index 92c668b..c6cecbf 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
@@ -16,10 +16,14 @@
  */
 package org.apache.camel.kafkaconnector.transforms;
 
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import io.netty.buffer.Unpooled;
+import org.apache.camel.component.netty.http.NettyChannelBufferStreamCache;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
@@ -63,6 +67,27 @@ public class CamelTypeConverterTransformTest {
     }
 
     @Test
+    public void testIfItConvertsNettyCorrectly() {
+        final String testMessage = "testMessage";
+        NettyChannelBufferStreamCache nettyTestValue = new NettyChannelBufferStreamCache(Unpooled.wrappedBuffer(testMessage.getBytes(Charset.defaultCharset())));
+
+        final SourceRecord connectRecord = new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "topic", Schema.STRING_SCHEMA, "1234", Schema.BYTES_SCHEMA, nettyTestValue);
+
+        final Map<String, Object> propsForValueSmt = new HashMap<>();
+        propsForValueSmt.put(CamelTypeConverterTransform.FIELD_TARGET_TYPE_CONFIG, "java.lang.String");
+
+        final Transformation<SourceRecord> transformationValue = new CamelTypeConverterTransform.Value<>();
+
+        transformationValue.configure(propsForValueSmt);
+
+        final SourceRecord transformedValueSourceRecord = transformationValue.apply(connectRecord);
+
+        assertEquals(java.lang.String.class, transformedValueSourceRecord.value().getClass());
+        assertEquals(Schema.STRING_SCHEMA, transformedValueSourceRecord.valueSchema());
+        assertEquals(testMessage, transformedValueSourceRecord.value());
+    }
+
+    @Test
     public void testIfHandlesTypeConvertersFromCamelComponents() {
         // we know we have a type converter from struct to map in dbz component, so we use this for testing
         final Schema schema = SchemaBuilder.struct()
diff --git a/parent/pom.xml b/parent/pom.xml
index 85a4fa7..4230a7f 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -35,6 +35,7 @@
         <version.guava>20.0</version.guava>
         <version.javax.annotation-api>1.3.2</version.javax.annotation-api>
         <version.postgres>42.2.14</version.postgres>
+        <version.jctools>3.3.0</version.jctools>
 
         <version.maven.compiler>3.8.1</version.maven.compiler>
         <version.maven.javadoc>3.1.1</version.maven.javadoc>
@@ -57,7 +58,6 @@
         <!-- Note: we are deliberately overriding this one due to GH issue #990 -->
         <testcontainers-version>1.15.2</testcontainers-version>
 
-
         <mycila-license-version>3.0</mycila-license-version>
         <gmavenplus-plugin-version>1.9.0</gmavenplus-plugin-version>
         <groovy-version>3.0.7</groovy-version>
@@ -116,6 +116,12 @@
                 <version>${version.guava}</version>
             </dependency>
 
+            <dependency>
+                <groupId>org.jctools</groupId>
+                <artifactId>jctools-core</artifactId>
+                <version>${version.jctools}</version>
+            </dependency>
+
             <!--  Kafka dependencies -->
             <dependency>
                 <groupId>org.apache.kafka</groupId>