You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/03/07 13:45:35 UTC
[camel-kafka-connector] 02/05: Properly handling UnitOfWork by
compelting it at the last possible moment, properly fix #202
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 2a1a50760146ef13a52ce0b622cedf39e04b7c34
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Fri Mar 5 22:15:53 2021 +0100
Properly handling UnitOfWork by compelting it at the last possible moment, properly fix #202
---
core/pom.xml | 19 ++++-
.../kafkaconnector/CamelSourceConnectorConfig.java | 5 ++
.../camel/kafkaconnector/CamelSourceRecord.java | 43 ++++++++++
.../camel/kafkaconnector/CamelSourceTask.java | 96 +++++++++++++++++-----
.../camel/kafkaconnector/CamelSourceTaskTest.java | 50 +++++++++++
.../CamelTypeConverterTransformTest.java | 25 ++++++
parent/pom.xml | 8 +-
7 files changed, 219 insertions(+), 27 deletions(-)
diff --git a/core/pom.xml b/core/pom.xml
index 2e32d16..f59f70d 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -49,6 +49,10 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
+ <artifactId>camel-seda</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
<artifactId>camel-kafka</artifactId>
</dependency>
<dependency>
@@ -56,6 +60,13 @@
<artifactId>camel-core-languages</artifactId>
</dependency>
+ <!-- Tools -->
+ <dependency>
+ <groupId>org.jctools</groupId>
+ <artifactId>jctools-core</artifactId>
+ <version>${version.jctools}</version>
+ </dependency>
+
<!-- Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
@@ -108,22 +119,22 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
- <artifactId>camel-seda</artifactId>
+ <artifactId>camel-timer</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
- <artifactId>camel-timer</artifactId>
+ <artifactId>camel-log</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
- <artifactId>camel-log</artifactId>
+ <artifactId>camel-slack</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
- <artifactId>camel-slack</artifactId>
+ <artifactId>camel-netty-http</artifactId>
<scope>test</scope>
</dependency>
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index bb4f8f8..4acfa62 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -54,6 +54,10 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig {
public static final String CAMEL_SOURCE_MAX_POLL_DURATION_CONF = "camel.source.maxPollDuration";
public static final String CAMEL_SOURCE_MAX_POLL_DURATION_DOC = "The maximum time in milliseconds spent in a single call to poll()";
+ public static final Integer CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_DEFAULT = 1024;
+ public static final String CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_CONF = "camel.source.maxNotCommittedRecords";
+ public static final String CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_DOC = "The maximum number of non committed kafka connect records that can be tolerated before stop polling new records (rounded to the next power of 2) with a minimum of 4.";
+
public static final Long CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DEFAULT = 1000L;
public static final String CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF = "camel.source.pollingConsumerQueueSize";
public static final String CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DOC = "The queue size for the internal hand-off queue between the polling consumer, and producers sending data into the queue.";
@@ -82,6 +86,7 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig {
.define(TOPIC_CONF, ConfigDef.Type.STRING, TOPIC_DEFAULT, ConfigDef.Importance.HIGH, TOPIC_DOC)
.define(CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF, Type.LONG, CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DOC)
.define(CAMEL_SOURCE_MAX_POLL_DURATION_CONF, Type.LONG, CAMEL_SOURCE_MAX_POLL_DURATION_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_POLL_DURATION_DOC)
+ .define(CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_CONF, Type.INT, CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_DOC)
.define(CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DOC)
.define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DOC)
.define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF, Type.BOOLEAN, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceRecord.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceRecord.java
new file mode 100644
index 0000000..87934ef
--- /dev/null
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceRecord.java
@@ -0,0 +1,43 @@
+package org.apache.camel.kafkaconnector;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Map;
+
+public class CamelSourceRecord extends SourceRecord {
+ private Integer claimCheck = null;
+
+ public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Integer partition, Schema valueSchema, Object value) {
+ super(sourcePartition, sourceOffset, topic, partition, valueSchema, value);
+ }
+
+ public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Schema valueSchema, Object value) {
+ super(sourcePartition, sourceOffset, topic, valueSchema, value);
+ }
+
+ public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Schema keySchema, Object key, Schema valueSchema, Object value) {
+ super(sourcePartition, sourceOffset, topic, keySchema, key, valueSchema, value);
+ }
+
+ public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Integer partition, Schema keySchema, Object key, Schema valueSchema, Object value) {
+ super(sourcePartition, sourceOffset, topic, partition, keySchema, key, valueSchema, value);
+ }
+
+ public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Integer partition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp) {
+ super(sourcePartition, sourceOffset, topic, partition, keySchema, key, valueSchema, value, timestamp);
+ }
+
+ public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Integer partition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp, Iterable<Header> headers) {
+ super(sourcePartition, sourceOffset, topic, partition, keySchema, key, valueSchema, value, timestamp, headers);
+ }
+
+ public Integer getClaimCheck() {
+ return claimCheck;
+ }
+
+ public void setClaimCheck(Integer claimCheck) {
+ this.claimCheck = claimCheck;
+ }
+}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 16e6bfc..03d0c1a 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -16,30 +16,37 @@
*/
package org.apache.camel.kafkaconnector;
-import java.math.BigDecimal;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.PollingConsumer;
+import org.apache.camel.StreamCache;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain;
import org.apache.camel.kafkaconnector.utils.SchemaHelper;
import org.apache.camel.kafkaconnector.utils.TaskHelper;
+import org.apache.camel.support.UnitOfWorkHelper;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.SpscArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
public class CamelSourceTask extends SourceTask {
public static final String HEADER_CAMEL_PREFIX = "CamelHeader.";
public static final String PROPERTY_CAMEL_PREFIX = "CamelProperty.";
@@ -49,18 +56,23 @@ public class CamelSourceTask extends SourceTask {
private static final String CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX = "camel.source.endpoint.";
private static final String CAMEL_SOURCE_PATH_PROPERTIES_PREFIX = "camel.source.path.";
- private static final String LOCAL_URL = "direct:end";
+ private static final String LOCAL_URL = "seda:end";
private CamelKafkaConnectMain cms;
private PollingConsumer consumer;
private String[] topics;
private Long maxBatchPollSize;
private Long maxPollDuration;
+ private Integer maxNotCommittedRecords;
private String camelMessageHeaderKey;
private LoggingLevel loggingLevel = LoggingLevel.OFF;
+ private Exchange[] exchangesWaitingForAck;
+ //the assumption is that at most 1 thread is running poll() method and at most 1 thread is running commitRecord()
+ private SpscArrayQueue<Integer> freeSlots;
private boolean mapProperties;
private boolean mapHeaders;
+
@Override
public String version() {
return VersionUtil.getVersion();
@@ -82,6 +94,7 @@ public class CamelSourceTask extends SourceTask {
maxBatchPollSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF);
maxPollDuration = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_POLL_DURATION_CONF);
+ maxNotCommittedRecords = config.getInt(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_CONF);
camelMessageHeaderKey = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF);
@@ -105,10 +118,24 @@ public class CamelSourceTask extends SourceTask {
final String headersRemovePattern = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF);
mapProperties = config.getBoolean(CamelSourceConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF);
mapHeaders = config.getBoolean(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_HEADERS_CONF);
-
+
topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(",");
- String localUrl = getLocalUrlWithPollingOptions(config);
+ long pollingConsumerQueueSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF);
+ long pollingConsumerBlockTimeout = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF);
+ boolean pollingConsumerBlockWhenFull = config.getBoolean(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF);
+ String localUrl = getLocalUrlWithPollingOptions(pollingConsumerQueueSize, pollingConsumerBlockTimeout, pollingConsumerBlockWhenFull);
+
+ freeSlots = new SpscArrayQueue<>(maxNotCommittedRecords);
+ freeSlots.fill(new MessagePassingQueue.Supplier<Integer>() {
+ int i = 0;
+ @Override
+ public Integer get() {
+ return i++;
+ }
+ });
+ //needs to be done like this because freeSlots capacity is rounded to the next power of 2 of maxNotCommittedRecords
+ exchangesWaitingForAck = new Exchange[freeSlots.capacity()];
CamelContext camelContext = new DefaultCamelContext();
if (remoteUrl == null) {
@@ -155,13 +182,14 @@ public class CamelSourceTask extends SourceTask {
@Override
public synchronized List<SourceRecord> poll() {
+ LOG.debug("Number of records waiting an ack: {}", freeSlots.capacity() - freeSlots.size());
final long startPollEpochMilli = Instant.now().toEpochMilli();
long remaining = remaining(startPollEpochMilli, maxPollDuration);
long collectedRecords = 0L;
List<SourceRecord> records = new ArrayList<>();
- while (collectedRecords < maxBatchPollSize && remaining > 0) {
+ while (collectedRecords < maxBatchPollSize && freeSlots.size() >= topics.length && remaining > 0) {
Exchange exchange = consumer.receive(remaining);
if (exchange == null) {
// Nothing received, abort and return what we received so far
@@ -177,31 +205,46 @@ public class CamelSourceTask extends SourceTask {
Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId());
final Object messageHeaderKey = camelMessageHeaderKey != null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null;
- final Object messageBodyValue = exchange.getMessage().getBody();
+ Object messageBodyValue = exchange.getMessage().getBody();
final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null;
final Schema messageBodySchema = messageBodyValue != null ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null;
final long timestamp = calculateTimestamp(exchange);
+ // take in account Cached camel streams
+ if (messageBodyValue instanceof StreamCache) {
+ StreamCache sc = (StreamCache) messageBodyValue;
+ // reset to be sure that the cache is ready to be used before sending it in the record (could be useful for SMTs)
+ sc.reset();
+ try {
+ messageBodyValue = sc.copy(exchange);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
for (String singleTopic : topics) {
- SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, singleTopic, null, messageKeySchema,
+ CamelSourceRecord camelRecord = new CamelSourceRecord(sourcePartition, sourceOffset, singleTopic, null, messageKeySchema,
messageHeaderKey, messageBodySchema, messageBodyValue, timestamp);
if (mapHeaders) {
if (exchange.getMessage().hasHeaders()) {
- setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
+ setAdditionalHeaders(camelRecord, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
}
}
if (mapProperties) {
if (exchange.hasProperties()) {
- setAdditionalHeaders(record, exchange.getProperties(), PROPERTY_CAMEL_PREFIX);
+ setAdditionalHeaders(camelRecord, exchange.getProperties(), PROPERTY_CAMEL_PREFIX);
}
}
- TaskHelper.logRecordContent(LOG, loggingLevel, record);
- records.add(record);
+ TaskHelper.logRecordContent(LOG, loggingLevel, camelRecord);
+ Integer claimCheck = freeSlots.remove();
+ camelRecord.setClaimCheck(claimCheck);
+ exchangesWaitingForAck[claimCheck] = exchange;
+ LOG.debug("Record: {}, containing data from exchange: {}, is associated with claim check number: {}", camelRecord, exchange, claimCheck);
+ records.add(camelRecord);
}
collectedRecords++;
remaining = remaining(startPollEpochMilli, maxPollDuration);
@@ -211,6 +254,18 @@ public class CamelSourceTask extends SourceTask {
}
@Override
+ public void commitRecord(SourceRecord record, RecordMetadata metadata) throws InterruptedException {
+ ///XXX: this should be a safe cast please see: https://issues.apache.org/jira/browse/KAFKA-12391
+ Integer claimCheck = ((CamelSourceRecord)record).getClaimCheck();
+ LOG.debug("Committing record with claim check number: {}", claimCheck);
+ Exchange correlatedExchange = exchangesWaitingForAck[claimCheck];
+ exchangesWaitingForAck[claimCheck] = null;
+ freeSlots.add(claimCheck);
+ UnitOfWorkHelper.doneSynchronizations(correlatedExchange, correlatedExchange.adapt(ExtendedExchange.class).handoverCompletions(), LOG);
+ LOG.debug("Record with claim check number: {} committed.", claimCheck);
+ }
+
+ @Override
public void stop() {
LOG.info("Stopping CamelSourceTask connector task");
try {
@@ -301,10 +356,7 @@ public class CamelSourceTask extends SourceTask {
}
}
- private String getLocalUrlWithPollingOptions(CamelSourceConnectorConfig config) {
- long pollingConsumerQueueSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF);
- long pollingConsumerBlockTimeout = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF);
- boolean pollingConsumerBlockWhenFull = config.getBoolean(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF);
+ private String getLocalUrlWithPollingOptions(long pollingConsumerQueueSize, long pollingConsumerBlockTimeout, boolean pollingConsumerBlockWhenFull) {
return LOCAL_URL + "?pollingConsumerQueueSize=" + pollingConsumerQueueSize + "&pollingConsumerBlockTimeout=" + pollingConsumerBlockTimeout
+ "&pollingConsumerBlockWhenFull=" + pollingConsumerBlockWhenFull;
}
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 21d56fc..51b4db3 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -16,15 +16,19 @@
*/
package org.apache.camel.kafkaconnector;
+import java.awt.print.PrinterJob;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.kafkaconnector.utils.StringJoinerAggregator;
@@ -77,6 +81,24 @@ public class CamelSourceTaskTest {
}
@Test
+ public void testSourcePollingMaxNotCommittedRecords() {
+ final long size = 4;
+ Map<String, String> props = new HashMap<>();
+ props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+ props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI);
+ props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_CONF, String.valueOf(size));
+
+ CamelSourceTask sourceTask = new CamelSourceTask();
+ sourceTask.start(props);
+
+ sendBatchOfRecords(sourceTask, size + 1);
+ List<SourceRecord> poll = sourceTask.poll();
+
+ assertEquals(4, poll.size());
+ sourceTask.stop();
+ }
+
+ @Test
public void testSourcePollingMaxBatchPollSize() {
final long size = 2;
Map<String, String> props = new HashMap<>();
@@ -621,4 +643,32 @@ public class CamelSourceTaskTest {
sourceTask.stop();
}
+
+ @Test
+ public void testRequestReply() throws InterruptedException {
+ Map<String, String> props = new HashMap<>();
+ props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+ props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI);
+
+ CamelSourceTask sourceTask = new CamelSourceTask();
+ sourceTask.start(props);
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ final ProducerTemplate template = sourceTask.getCms().getProducerTemplate();
+ String result = template.requestBody(DIRECT_URI, "test", String.class);
+ assertEquals("test", result);
+ }
+ });
+
+ List<SourceRecord> poll = sourceTask.poll();
+ assertEquals(1, poll.size());
+
+ sourceTask.commitRecord(poll.get(0), null);
+
+ sourceTask.stop();
+ executor.shutdown();
+ }
}
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
index 92c668b..c6cecbf 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
@@ -16,10 +16,14 @@
*/
package org.apache.camel.kafkaconnector.transforms;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import io.netty.buffer.Unpooled;
+import org.apache.camel.component.netty.http.NettyChannelBufferStreamCache;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
@@ -63,6 +67,27 @@ public class CamelTypeConverterTransformTest {
}
@Test
+ public void testIfItConvertsNettyCorrectly() {
+ final String testMessage = "testMessage";
+ NettyChannelBufferStreamCache nettyTestValue = new NettyChannelBufferStreamCache(Unpooled.wrappedBuffer(testMessage.getBytes(Charset.defaultCharset())));
+
+ final SourceRecord connectRecord = new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "topic", Schema.STRING_SCHEMA, "1234", Schema.BYTES_SCHEMA, nettyTestValue);
+
+ final Map<String, Object> propsForValueSmt = new HashMap<>();
+ propsForValueSmt.put(CamelTypeConverterTransform.FIELD_TARGET_TYPE_CONFIG, "java.lang.String");
+
+ final Transformation<SourceRecord> transformationValue = new CamelTypeConverterTransform.Value<>();
+
+ transformationValue.configure(propsForValueSmt);
+
+ final SourceRecord transformedValueSourceRecord = transformationValue.apply(connectRecord);
+
+ assertEquals(java.lang.String.class, transformedValueSourceRecord.value().getClass());
+ assertEquals(Schema.STRING_SCHEMA, transformedValueSourceRecord.valueSchema());
+ assertEquals(testMessage, transformedValueSourceRecord.value());
+ }
+
+ @Test
public void testIfHandlesTypeConvertersFromCamelComponents() {
// we know we have a type converter from struct to map in dbz component, so we use this for testing
final Schema schema = SchemaBuilder.struct()
diff --git a/parent/pom.xml b/parent/pom.xml
index 85a4fa7..4230a7f 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -35,6 +35,7 @@
<version.guava>20.0</version.guava>
<version.javax.annotation-api>1.3.2</version.javax.annotation-api>
<version.postgres>42.2.14</version.postgres>
+ <version.jctools>3.3.0</version.jctools>
<version.maven.compiler>3.8.1</version.maven.compiler>
<version.maven.javadoc>3.1.1</version.maven.javadoc>
@@ -57,7 +58,6 @@
<!-- Note: we are deliberately overriding this one due to GH issue #990 -->
<testcontainers-version>1.15.2</testcontainers-version>
-
<mycila-license-version>3.0</mycila-license-version>
<gmavenplus-plugin-version>1.9.0</gmavenplus-plugin-version>
<groovy-version>3.0.7</groovy-version>
@@ -116,6 +116,12 @@
<version>${version.guava}</version>
</dependency>
+ <dependency>
+ <groupId>org.jctools</groupId>
+ <artifactId>jctools-core</artifactId>
+ <version>${version.jctools}</version>
+ </dependency>
+
<!-- Kafka dependencies -->
<dependency>
<groupId>org.apache.kafka</groupId>