You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/11/24 08:19:25 UTC

[camel-kafka-connector] 03/18: Added Memory Idempotency support for both sink and source

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 5465d13ef8f012a389123e3523c471e6c40cacc7
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Tue Nov 17 19:40:15 2020 +0100

    Added Memory Idempotency support for both sink and source
---
 .../camel/kafkaconnector/CamelConnectorConfig.java |  16 ++
 .../kafkaconnector/CamelSinkConnectorConfig.java   |   8 +-
 .../apache/camel/kafkaconnector/CamelSinkTask.java |   9 ++
 .../kafkaconnector/CamelSourceConnectorConfig.java |   8 +-
 .../camel/kafkaconnector/CamelSourceTask.java      |   9 +-
 .../utils/CamelKafkaConnectMain.java               |  69 ++++++++-
 .../camel/kafkaconnector/CamelSinkTaskTest.java    | 172 +++++++++++++++++++++
 .../camel/kafkaconnector/CamelSourceTaskTest.java  |  99 ++++++++++++
 8 files changed, 379 insertions(+), 11 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
index 4271938..0332576 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
@@ -47,6 +47,22 @@ public abstract class CamelConnectorConfig extends AbstractConfig {
     public static final String CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF = "camel.error.handler.redelivery.delay";
     public static final String CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC = "The initial redelivery delay in milliseconds in case of Default Error Handler";
     
+    public static final Boolean CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DEFAULT = false;
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF = "camel.idempotency.enabled";
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DOC = "If in memory idempotency must be enabled or not";
+    
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DEFAULT = "body";
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF = "camel.idempotency.expression.type";
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DOC = "How the idempotency will be evaluated: possible values are body and header";
+    
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DEFAULT = null;
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF = "camel.idempotency.expression.header";
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DOC = "The header name that will be evaluated in case of camel.idempotency.expression.type equals to header";    
+    
+    public static final int CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT = 100;
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF = "camel.idempotency.memory.dimension";
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC = "The Memory dimension of the in memory idempotent Repository";
+    
     protected CamelConnectorConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
         super(definition, originals, configProviderProps, doLog);
     }
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index e86e921..299d578 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -57,8 +57,12 @@ public class CamelSinkConnectorConfig extends CamelConnectorConfig {
         .define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC)
         .define(CAMEL_CONNECTOR_ERROR_HANDLER_CONF, Type.STRING, CAMEL_CONNECTOR_ERROR_HANDLER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_ERROR_HANDLER_DOC)
         .define(CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF, Type.INT, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DOC)
-        .define(CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF, Type.LONG, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC);
-
+        .define(CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF, Type.LONG, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC);
+    
     public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
         super(config, parsedConfig);
     }
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index a38afea..2cc01a2 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -85,6 +85,11 @@ public class CamelSinkTask extends SinkTask {
             final int maxRedeliveries = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF);
             final long redeliveryDelay = config.getLong(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF);
             final String errorHandler = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF);
+            final Boolean idempotencyEnabled = config.getBoolean(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF);
+            final String expressionType = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF);
+            final String expressionHeader = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF);
+            final int memoryDimension = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF);
+            
             CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
                 remoteUrl = TaskHelper.buildUrl(camelContext,
@@ -103,6 +108,10 @@ public class CamelSinkTask extends SinkTask {
                 .withErrorHandler(errorHandler)
                 .withMaxRedeliveries(maxRedeliveries)
                 .withRedeliveryDelay(redeliveryDelay)
+                .withIdempotencyEnabled(idempotencyEnabled)
+                .withExpressionType(expressionType)
+                .withExpressionHeader(expressionHeader)
+                .withMemoryDimension(memoryDimension)
                 .build(camelContext);
 
 
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index 2ecfb45..70e2b09 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -93,8 +93,12 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig {
         .define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC)
         .define(CAMEL_CONNECTOR_ERROR_HANDLER_CONF, Type.STRING, CAMEL_CONNECTOR_ERROR_HANDLER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_ERROR_HANDLER_DOC)
         .define(CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF, Type.INT, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DOC)
-        .define(CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF, Type.LONG, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC);
-
+        .define(CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF, Type.LONG, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC);
+    
     public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
         super(config, parsedConfig);
     }
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 165ffff..e32d1c2 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -91,7 +91,10 @@ public class CamelSourceTask extends SourceTask {
             final int maxRedeliveries = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF);
             final long redeliveryDelay = config.getLong(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF);
             final String errorHandler = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF);
-            
+            final Boolean idempotencyEnabled = config.getBoolean(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF);
+            final String expressionType = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF);
+            final String expressionHeader = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF);
+            final int memoryDimension = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF);
             topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(",");
 
             String localUrl = getLocalUrlWithPollingOptions(config);
@@ -113,6 +116,10 @@ public class CamelSourceTask extends SourceTask {
                 .withErrorHandler(errorHandler)
                 .withMaxRedeliveries(maxRedeliveries)
                 .withRedeliveryDelay(redeliveryDelay)
+                .withIdempotencyEnabled(idempotencyEnabled)
+                .withExpressionType(expressionType)
+                .withExpressionHeader(expressionHeader)
+                .withMemoryDimension(memoryDimension)
                 .build(camelContext);
 
             consumer = cms.getCamelContext().getEndpoint(localUrl).createPollingConsumer();
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 7a43f0f..d0695bd 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -28,6 +28,7 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.kafkaconnector.CamelConnectorConfig;
 import org.apache.camel.main.SimpleMain;
 import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
@@ -94,6 +95,10 @@ public class CamelKafkaConnectMain extends SimpleMain {
         private String errorHandler;
         private int maxRedeliveries;
         private long redeliveryDelay;
+        private boolean idempotencyEnabled;
+        private String expressionType;
+        private String expressionHeader;
+        private int memoryDimension;
 
         public Builder(String from, String to) {
             this.from = from;
@@ -139,6 +144,26 @@ public class CamelKafkaConnectMain extends SimpleMain {
             this.redeliveryDelay = redeliveryDelay;
             return this;
         }
+        
+        public Builder withIdempotencyEnabled(boolean idempotencyEnabled) {
+        	this.idempotencyEnabled = idempotencyEnabled;
+        	return this;
+        }
+        
+        public Builder withExpressionType(String expressionType) {
+        	this.expressionType = expressionType;
+        	return this;
+        }
+        
+        public Builder withExpressionHeader(String expressionHeader) {
+        	this.expressionHeader = expressionHeader;
+        	return this;
+        }
+        
+        public Builder withMemoryDimension(int memoryDimension) {
+        	this.memoryDimension = memoryDimension;
+        	return this;
+        }
 
         public CamelKafkaConnectMain build(CamelContext camelContext) {
             CamelKafkaConnectMain camelMain = new CamelKafkaConnectMain(camelContext);
@@ -183,13 +208,45 @@ public class CamelKafkaConnectMain extends SimpleMain {
                     if (getContext().getRegistry().lookupByName("aggregate") != null) {
                         //aggregation
                         AggregationStrategy s = getContext().getRegistry().lookupByNameAndType(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME, AggregationStrategy.class);
-                        LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({})", s, aggregationSize, aggregationTimeout);
-                        LOG.info(".to({})", to);
-                        rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(to);
+                        if (idempotencyEnabled) {
+                            switch (expressionType) {
+                                case "body":
+                                    LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
+                                    LOG.info(".to({})", to);
+                                	rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+                                    break;
+                                case "header":
+                                    LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
+                                    LOG.info(".to({})", to);
+                                    rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+                                    break;
+                                default:
+                                    break;
+                            }
+                        } else {
+                            LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({})", s, aggregationSize, aggregationTimeout);
+                            LOG.info(".to({})", to);
+                            rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(to);
+                        }
                     } else {
-                        //to
-                        LOG.info(".to({})", to);
-                        rd.toD(to);
+                        if (idempotencyEnabled) {
+                            switch (expressionType) {
+                                case "body":
+                                	LOG.info("idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
+                                	rd.idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+                                    break;
+                                case "header":
+                                	LOG.info("idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
+                                    rd.idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+                                    break;
+                                default:
+                                    break;
+                            }
+                        } else {
+                            //to
+                            LOG.info(".to({})", to);
+                            rd.toD(to);
+                        }
                     }
                 }
             });
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 6559456..9472206 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -29,8 +29,10 @@ import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
 import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.junit.jupiter.api.Test;
 
@@ -663,6 +665,176 @@ public class CamelSinkTaskTest {
 
         sinkTask.stop();
     }
+    
+    @Test
+    public void testAggregationWithIdempotencyBodyAndTimeout() throws InterruptedException {
+        Map<String, String> props = new HashMap<>();
+        props.put(TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator");
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, "5");
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, "100");
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, "true");
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body");
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
+
+        List<SinkRecord> records = new ArrayList<SinkRecord>();
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        SinkRecord record1 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel1", 42);
+        SinkRecord record2 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel2", 42);
+        SinkRecord record3 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel3", 42);
+        SinkRecord record4 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel4", 42);
+        SinkRecord record5 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        SinkRecord record6 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel1", 42);
+        SinkRecord record7 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel2", 42);
+        SinkRecord record8 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel3", 42);
+        SinkRecord record9 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel4", 42);
+        SinkRecord record10 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel5", 42);
+        SinkRecord record11 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel6", 42);
+        SinkRecord record12 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel7", 42);
+        SinkRecord record13 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel8", 42);
+        SinkRecord record14 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel9", 42);
+        records.add(record);
+        records.add(record1);
+        records.add(record2);
+        records.add(record3);
+        records.add(record4);
+        records.add(record5);
+        records.add(record6);
+        records.add(record7);
+        records.add(record8);
+        records.add(record9);
+        records.add(record10);
+        records.add(record11);
+        records.add(record12);
+        records.add(record13);
+        records.add(record14);
+
+        sinkTask.put(records);
+
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+        assertEquals("camel camel1 camel2 camel3 camel4", exchange.getMessage().getBody());
+        assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
+            .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
+        
+        exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+        assertEquals("camel5 camel6 camel7 camel8 camel9", exchange.getMessage().getBody());
+        assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
+            .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
+
+        sinkTask.stop();
+    }
+    
+    @Test
+    public void testWithIdempotency() throws InterruptedException {
+        Map<String, String> props = new HashMap<>();
+        props.put(TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, "true");
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body");
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
+
+        List<SinkRecord> records = new ArrayList<SinkRecord>();
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        SinkRecord record1 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        SinkRecord record2 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        SinkRecord record3 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        SinkRecord record4 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel1", 42);
+        SinkRecord record5 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        SinkRecord record6 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        SinkRecord record7 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        SinkRecord record8 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        SinkRecord record9 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel2", 42);
+        SinkRecord record10 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        SinkRecord record11 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        SinkRecord record12 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        SinkRecord record13 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        SinkRecord record14 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        records.add(record);
+        records.add(record1);
+        records.add(record2);
+        records.add(record3);
+        records.add(record4);
+        records.add(record5);
+        records.add(record6);
+        records.add(record7);
+        records.add(record8);
+        records.add(record9);
+        records.add(record10);
+        records.add(record11);
+        records.add(record12);
+        records.add(record13);
+        records.add(record14);
+
+        sinkTask.put(records);
+
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+        assertEquals("camel", exchange.getMessage().getBody());
+        assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
+            .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
+        
+        exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+        assertEquals("camel1", exchange.getMessage().getBody());
+        assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
+            .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
+        
+        exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+        assertEquals("camel2", exchange.getMessage().getBody());
+        assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
+            .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
+
+        sinkTask.stop();
+    }
+    
+    @Test
+    public void testWithIdempotencyAndHeader() throws InterruptedException {
+        Map<String, String> props = new HashMap<>();
+        props.put(TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, "true");
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "header");
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, "headerIdempotency");
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
+
+        List<SinkRecord> records = new ArrayList<SinkRecord>();
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        record.headers().add("CamelHeader.headerIdempotency", new SchemaAndValue(Schema.STRING_SCHEMA, "Test"));
+        SinkRecord record1 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        record1.headers().add("CamelHeader.headerIdempotency", new SchemaAndValue(Schema.STRING_SCHEMA, "Test"));
+        SinkRecord record2 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel1", 42);
+        record2.headers().add("CamelHeader.headerIdempotency", new SchemaAndValue(Schema.STRING_SCHEMA, "Test1"));
+
+        records.add(record);
+        records.add(record1);
+        records.add(record2);
+
+        sinkTask.put(records);
+
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+        assertEquals("camel", exchange.getMessage().getBody());
+        assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
+            .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
+        
+        exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+        assertEquals("camel1", exchange.getMessage().getBody());
+        assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
+            .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
+
+        sinkTask.stop();
+    }
 
     @Test
     public void testSecretRaw() {
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index ef393f9..ea7bb1b 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -436,4 +436,103 @@ public class CamelSourceTaskTest {
             sourceTask.stop();
         }
     }
+    
+    @Test
+    public void testSourcePollingWithIdempotencyEnabledAndBody() {
+
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(mapOf(
+            CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
+            CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true,
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body"
+        ));
+
+        try {
+
+        	sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test");
+        	sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test1");
+        	sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test");
+        	sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test2");
+
+            List<SourceRecord> records = sourceTask.poll();
+
+            assertThat(records).hasSize(3);
+            assertThat(records).element(0).hasFieldOrPropertyWithValue("value", "Test");
+            assertThat(records).element(1).hasFieldOrPropertyWithValue("value", "Test1");
+            assertThat(records).element(2).hasFieldOrPropertyWithValue("value", "Test2");
+        } finally {
+            sourceTask.stop();
+        }
+    }
+    
+    @Test
+    public void testSourcePollingWithIdempotencyEnabledAndHeader() {
+
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(mapOf(
+            CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
+            CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true,
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "header",
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, "headerIdempotency"
+        ));
+
+        try {
+
+        	sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test", "headerIdempotency", "Test");
+        	sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test1", "headerIdempotency", "Test1");
+        	sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "TestTest", "headerIdempotency", "Test");
+        	sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test2", "headerIdempotency", "Test2");
+
+            List<SourceRecord> records = sourceTask.poll();
+
+            assertThat(records).hasSize(3);
+            assertThat(records).element(0).hasFieldOrPropertyWithValue("value", "Test");
+            assertThat(records).element(1).hasFieldOrPropertyWithValue("value", "Test1");
+            assertThat(records).element(2).hasFieldOrPropertyWithValue("value", "Test2");
+        } finally {
+            sourceTask.stop();
+        }
+    }
+    
+    @Test
+    public void testSourcePollingWithAggregationAndIdempotencyBySizeAndTimeout() {
+        final int chunkSize = 2;
+
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(mapOf(
+            CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
+            CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true,
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body",
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.StringJoinerAggregator",
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF + ".delimiter", "|",
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, chunkSize
+        ));
+
+        try {
+            assertThat(sourceTask.getCms().getCamelContext().getRegistry().lookupByName(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME))
+                .isInstanceOf(StringJoinerAggregator.class)
+                .hasFieldOrPropertyWithValue("delimiter", "|");
+
+            sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 0);
+            sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 1);
+            sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 2);
+            sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 3);
+            sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 0);
+            sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 1);
+            sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 3);
+            sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 2);
+            
+            List<SourceRecord> records = sourceTask.poll();
+
+            assertThat(records).hasSize(3);
+            assertThat(records).element(0).hasFieldOrPropertyWithValue("value", "0|1");
+            assertThat(records).element(1).hasFieldOrPropertyWithValue("value", "2|3");
+            assertThat(records).element(2).hasFieldOrPropertyWithValue("value", "3|2");
+        } finally {
+            sourceTask.stop();
+        }
+    }
 }