You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/11/17 20:52:39 UTC

[camel-kafka-connector] branch idempotency created (now 8cd6e2c)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch idempotency
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


      at 8cd6e2c  Fixed CS

This branch includes the following new commits:

     new ee5d8e9  Added Memory Idempotency support for both sink and source
     new 8cd6e2c  Fixed CS

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel-kafka-connector] 02/02: Fixed CS

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch idempotency
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 8cd6e2cef43d4951f1c2895b5a2c8a1d10ca6e6e
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Tue Nov 17 21:51:54 2020 +0100

    Fixed CS
---
 .../camel/kafkaconnector/CamelSourceTask.java      |  1 +
 .../utils/CamelKafkaConnectMain.java               | 33 ++++++------
 .../camel/kafkaconnector/CamelSinkTaskTest.java    |  1 -
 .../camel/kafkaconnector/CamelSourceTaskTest.java  | 59 +++++++++-------------
 4 files changed, 43 insertions(+), 51 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index e32d1c2..4e893ab 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -95,6 +95,7 @@ public class CamelSourceTask extends SourceTask {
             final String expressionType = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF);
             final String expressionHeader = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF);
             final int memoryDimension = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF);
+            
             topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(",");
 
             String localUrl = getLocalUrlWithPollingOptions(config);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index d0695bd..4954ed9 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -146,23 +146,23 @@ public class CamelKafkaConnectMain extends SimpleMain {
         }
         
         public Builder withIdempotencyEnabled(boolean idempotencyEnabled) {
-        	this.idempotencyEnabled = idempotencyEnabled;
-        	return this;
+            this.idempotencyEnabled = idempotencyEnabled;
+            return this;
         }
         
         public Builder withExpressionType(String expressionType) {
-        	this.expressionType = expressionType;
-        	return this;
+            this.expressionType = expressionType;
+            return this;
         }
         
         public Builder withExpressionHeader(String expressionHeader) {
-        	this.expressionHeader = expressionHeader;
-        	return this;
+            this.expressionHeader = expressionHeader;
+            return this;
         }
         
         public Builder withMemoryDimension(int memoryDimension) {
-        	this.memoryDimension = memoryDimension;
-        	return this;
+            this.memoryDimension = memoryDimension;
+            return this;
         }
 
         public CamelKafkaConnectMain build(CamelContext camelContext) {
@@ -211,14 +211,17 @@ public class CamelKafkaConnectMain extends SimpleMain {
                         if (idempotencyEnabled) {
                             switch (expressionType) {
                                 case "body":
-                                    LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
+                                    LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(body(), + "
+                                           + "MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
                                     LOG.info(".to({})", to);
-                                	rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+                                    rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
                                     break;
                                 case "header":
-                                    LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
+                                    LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(header(expressionHeader), + "
+                                           + "MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
                                     LOG.info(".to({})", to);
-                                    rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+                                    rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout)
+                                        .idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
                                     break;
                                 default:
                                     break;
@@ -232,11 +235,11 @@ public class CamelKafkaConnectMain extends SimpleMain {
                         if (idempotencyEnabled) {
                             switch (expressionType) {
                                 case "body":
-                                	LOG.info("idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
-                                	rd.idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+                                    LOG.info("idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
+                                    rd.idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
                                     break;
                                 case "header":
-                                	LOG.info("idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
+                                    LOG.info("idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
                                     rd.idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
                                     break;
                                 default:
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 9472206..96e7eb6 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -32,7 +32,6 @@ import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.junit.jupiter.api.Test;
 
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index ea7bb1b..c6fa7eb 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -441,19 +441,16 @@ public class CamelSourceTaskTest {
     public void testSourcePollingWithIdempotencyEnabledAndBody() {
 
         CamelSourceTask sourceTask = new CamelSourceTask();
-        sourceTask.start(mapOf(
-            CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
-            CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
-            CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true,
-            CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body"
-        ));
+        sourceTask.start(mapOf(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME, CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
+                               CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true,
+                               CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body"));
 
         try {
 
-        	sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test");
-        	sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test1");
-        	sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test");
-        	sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test2");
+            sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test");
+            sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test1");
+            sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test");
+            sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test2");
 
             List<SourceRecord> records = sourceTask.poll();
 
@@ -465,25 +462,22 @@ public class CamelSourceTaskTest {
             sourceTask.stop();
         }
     }
-    
+
     @Test
     public void testSourcePollingWithIdempotencyEnabledAndHeader() {
 
         CamelSourceTask sourceTask = new CamelSourceTask();
-        sourceTask.start(mapOf(
-            CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
-            CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
-            CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true,
-            CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "header",
-            CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, "headerIdempotency"
-        ));
+        sourceTask
+            .start(mapOf(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME, CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
+                         CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true, CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF,
+                         "header", CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, "headerIdempotency"));
 
         try {
 
-        	sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test", "headerIdempotency", "Test");
-        	sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test1", "headerIdempotency", "Test1");
-        	sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "TestTest", "headerIdempotency", "Test");
-        	sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test2", "headerIdempotency", "Test2");
+            sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test", "headerIdempotency", "Test");
+            sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test1", "headerIdempotency", "Test1");
+            sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "TestTest", "headerIdempotency", "Test");
+            sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test2", "headerIdempotency", "Test2");
 
             List<SourceRecord> records = sourceTask.poll();
 
@@ -495,26 +489,21 @@ public class CamelSourceTaskTest {
             sourceTask.stop();
         }
     }
-    
+
     @Test
     public void testSourcePollingWithAggregationAndIdempotencyBySizeAndTimeout() {
         final int chunkSize = 2;
 
         CamelSourceTask sourceTask = new CamelSourceTask();
-        sourceTask.start(mapOf(
-            CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
-            CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
-            CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true,
-            CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body",
-            CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.StringJoinerAggregator",
-            CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF + ".delimiter", "|",
-            CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, chunkSize
-        ));
+        sourceTask
+            .start(mapOf(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME, CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
+                         CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true, CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF,
+                         "body", CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.StringJoinerAggregator",
+                         CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF + ".delimiter", "|", CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, chunkSize));
 
         try {
             assertThat(sourceTask.getCms().getCamelContext().getRegistry().lookupByName(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME))
-                .isInstanceOf(StringJoinerAggregator.class)
-                .hasFieldOrPropertyWithValue("delimiter", "|");
+                .isInstanceOf(StringJoinerAggregator.class).hasFieldOrPropertyWithValue("delimiter", "|");
 
             sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 0);
             sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 1);
@@ -524,7 +513,7 @@ public class CamelSourceTaskTest {
             sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 1);
             sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 3);
             sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 2);
-            
+
             List<SourceRecord> records = sourceTask.poll();
 
             assertThat(records).hasSize(3);


[camel-kafka-connector] 01/02: Added Memory Idempotency support for both sink and source

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch idempotency
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit ee5d8e96b6c10e893f5f38979a59e8e2ede33b56
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Tue Nov 17 19:40:15 2020 +0100

    Added Memory Idempotency support for both sink and source
---
 .../camel/kafkaconnector/CamelConnectorConfig.java |  16 ++
 .../kafkaconnector/CamelSinkConnectorConfig.java   |   8 +-
 .../apache/camel/kafkaconnector/CamelSinkTask.java |   9 ++
 .../kafkaconnector/CamelSourceConnectorConfig.java |   8 +-
 .../camel/kafkaconnector/CamelSourceTask.java      |   9 +-
 .../utils/CamelKafkaConnectMain.java               |  69 ++++++++-
 .../camel/kafkaconnector/CamelSinkTaskTest.java    | 172 +++++++++++++++++++++
 .../camel/kafkaconnector/CamelSourceTaskTest.java  |  99 ++++++++++++
 8 files changed, 379 insertions(+), 11 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
index 4271938..0332576 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
@@ -47,6 +47,22 @@ public abstract class CamelConnectorConfig extends AbstractConfig {
     public static final String CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF = "camel.error.handler.redelivery.delay";
     public static final String CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC = "The initial redelivery delay in milliseconds in case of Default Error Handler";
     
+    public static final Boolean CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DEFAULT = false;
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF = "camel.idempotency.enabled";
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DOC = "If in memory idempotency must be enabled or not";
+    
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DEFAULT = "body";
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF = "camel.idempotency.expression.type";
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DOC = "How the idempotency will be evaluated: possible values are body and header";
+    
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DEFAULT = null;
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF = "camel.idempotency.expression.header";
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DOC = "The header name that will be evaluated in case of camel.idempotency.expression.type equals to header";    
+    
+    public static final int CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT = 100;
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF = "camel.idempotency.memory.dimension";
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC = "The Memory dimension of the in memory idempotent Repository";
+    
     protected CamelConnectorConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
         super(definition, originals, configProviderProps, doLog);
     }
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index e86e921..299d578 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -57,8 +57,12 @@ public class CamelSinkConnectorConfig extends CamelConnectorConfig {
         .define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC)
         .define(CAMEL_CONNECTOR_ERROR_HANDLER_CONF, Type.STRING, CAMEL_CONNECTOR_ERROR_HANDLER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_ERROR_HANDLER_DOC)
         .define(CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF, Type.INT, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DOC)
-        .define(CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF, Type.LONG, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC);
-
+        .define(CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF, Type.LONG, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC);
+    
     public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
         super(config, parsedConfig);
     }
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index a38afea..2cc01a2 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -85,6 +85,11 @@ public class CamelSinkTask extends SinkTask {
             final int maxRedeliveries = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF);
             final long redeliveryDelay = config.getLong(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF);
             final String errorHandler = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF);
+            final Boolean idempotencyEnabled = config.getBoolean(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF);
+            final String expressionType = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF);
+            final String expressionHeader = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF);
+            final int memoryDimension = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF);
+            
             CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
                 remoteUrl = TaskHelper.buildUrl(camelContext,
@@ -103,6 +108,10 @@ public class CamelSinkTask extends SinkTask {
                 .withErrorHandler(errorHandler)
                 .withMaxRedeliveries(maxRedeliveries)
                 .withRedeliveryDelay(redeliveryDelay)
+                .withIdempotencyEnabled(idempotencyEnabled)
+                .withExpressionType(expressionType)
+                .withExpressionHeader(expressionHeader)
+                .withMemoryDimension(memoryDimension)
                 .build(camelContext);
 
 
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index 2ecfb45..70e2b09 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -93,8 +93,12 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig {
         .define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC)
         .define(CAMEL_CONNECTOR_ERROR_HANDLER_CONF, Type.STRING, CAMEL_CONNECTOR_ERROR_HANDLER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_ERROR_HANDLER_DOC)
         .define(CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF, Type.INT, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DOC)
-        .define(CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF, Type.LONG, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC);
-
+        .define(CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF, Type.LONG, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC);
+    
     public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
         super(config, parsedConfig);
     }
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 165ffff..e32d1c2 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -91,7 +91,10 @@ public class CamelSourceTask extends SourceTask {
             final int maxRedeliveries = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF);
             final long redeliveryDelay = config.getLong(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF);
             final String errorHandler = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF);
-            
+            final Boolean idempotencyEnabled = config.getBoolean(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF);
+            final String expressionType = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF);
+            final String expressionHeader = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF);
+            final int memoryDimension = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF);
             topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(",");
 
             String localUrl = getLocalUrlWithPollingOptions(config);
@@ -113,6 +116,10 @@ public class CamelSourceTask extends SourceTask {
                 .withErrorHandler(errorHandler)
                 .withMaxRedeliveries(maxRedeliveries)
                 .withRedeliveryDelay(redeliveryDelay)
+                .withIdempotencyEnabled(idempotencyEnabled)
+                .withExpressionType(expressionType)
+                .withExpressionHeader(expressionHeader)
+                .withMemoryDimension(memoryDimension)
                 .build(camelContext);
 
             consumer = cms.getCamelContext().getEndpoint(localUrl).createPollingConsumer();
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 7a43f0f..d0695bd 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -28,6 +28,7 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.kafkaconnector.CamelConnectorConfig;
 import org.apache.camel.main.SimpleMain;
 import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
@@ -94,6 +95,10 @@ public class CamelKafkaConnectMain extends SimpleMain {
         private String errorHandler;
         private int maxRedeliveries;
         private long redeliveryDelay;
+        private boolean idempotencyEnabled;
+        private String expressionType;
+        private String expressionHeader;
+        private int memoryDimension;
 
         public Builder(String from, String to) {
             this.from = from;
@@ -139,6 +144,26 @@ public class CamelKafkaConnectMain extends SimpleMain {
             this.redeliveryDelay = redeliveryDelay;
             return this;
         }
+        
+        public Builder withIdempotencyEnabled(boolean idempotencyEnabled) {
+        	this.idempotencyEnabled = idempotencyEnabled;
+        	return this;
+        }
+        
+        public Builder withExpressionType(String expressionType) {
+        	this.expressionType = expressionType;
+        	return this;
+        }
+        
+        public Builder withExpressionHeader(String expressionHeader) {
+        	this.expressionHeader = expressionHeader;
+        	return this;
+        }
+        
+        public Builder withMemoryDimension(int memoryDimension) {
+        	this.memoryDimension = memoryDimension;
+        	return this;
+        }
 
         public CamelKafkaConnectMain build(CamelContext camelContext) {
             CamelKafkaConnectMain camelMain = new CamelKafkaConnectMain(camelContext);
@@ -183,13 +208,45 @@ public class CamelKafkaConnectMain extends SimpleMain {
                     if (getContext().getRegistry().lookupByName("aggregate") != null) {
                         //aggregation
                         AggregationStrategy s = getContext().getRegistry().lookupByNameAndType(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME, AggregationStrategy.class);
-                        LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({})", s, aggregationSize, aggregationTimeout);
-                        LOG.info(".to({})", to);
-                        rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(to);
+                        if (idempotencyEnabled) {
+                            switch (expressionType) {
+                                case "body":
+                                    LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
+                                    LOG.info(".to({})", to);
+                                	rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+                                    break;
+                                case "header":
+                                    LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
+                                    LOG.info(".to({})", to);
+                                    rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+                                    break;
+                                default:
+                                    break;
+                            }
+                        } else {
+                            LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({})", s, aggregationSize, aggregationTimeout);
+                            LOG.info(".to({})", to);
+                            rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(to);
+                        }
                     } else {
-                        //to
-                        LOG.info(".to({})", to);
-                        rd.toD(to);
+                        if (idempotencyEnabled) {
+                            switch (expressionType) {
+                                case "body":
+                                	LOG.info("idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
+                                	rd.idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+                                    break;
+                                case "header":
+                                	LOG.info("idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
+                                    rd.idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+                                    break;
+                                default:
+                                    break;
+                            }
+                        } else {
+                            //to
+                            LOG.info(".to({})", to);
+                            rd.toD(to);
+                        }
                     }
                 }
             });
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 6559456..9472206 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -29,8 +29,10 @@ import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
 import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.junit.jupiter.api.Test;
 
@@ -663,6 +665,176 @@ public class CamelSinkTaskTest {
 
         sinkTask.stop();
     }
+    
+    @Test
+    public void testAggregationWithIdempotencyBodyAndTimeout() throws InterruptedException {
+        Map<String, String> props = new HashMap<>();
+        props.put(TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator");
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, "5");
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, "100");
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, "true");
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body");
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
+
+        List<SinkRecord> records = new ArrayList<SinkRecord>();
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        SinkRecord record1 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel1", 42);
+        SinkRecord record2 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel2", 42);
+        SinkRecord record3 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel3", 42);
+        SinkRecord record4 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel4", 42);
+        SinkRecord record5 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        SinkRecord record6 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel1", 42);
+        SinkRecord record7 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel2", 42);
+        SinkRecord record8 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel3", 42);
+        SinkRecord record9 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel4", 42);
+        SinkRecord record10 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel5", 42);
+        SinkRecord record11 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel6", 42);
+        SinkRecord record12 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel7", 42);
+        SinkRecord record13 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel8", 42);
+        SinkRecord record14 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel9", 42);
+        records.add(record);
+        records.add(record1);
+        records.add(record2);
+        records.add(record3);
+        records.add(record4);
+        records.add(record5);
+        records.add(record6);
+        records.add(record7);
+        records.add(record8);
+        records.add(record9);
+        records.add(record10);
+        records.add(record11);
+        records.add(record12);
+        records.add(record13);
+        records.add(record14);
+
+        sinkTask.put(records);
+
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+        assertEquals("camel camel1 camel2 camel3 camel4", exchange.getMessage().getBody());
+        assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
+            .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
+        
+        exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+        assertEquals("camel5 camel6 camel7 camel8 camel9", exchange.getMessage().getBody());
+        assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
+            .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
+
+        sinkTask.stop();
+    }
+    
+    @Test
+    public void testWithIdempotency() throws InterruptedException {
+        Map<String, String> props = new HashMap<>();
+        props.put(TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, "true");
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body");
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
+
+        List<SinkRecord> records = new ArrayList<SinkRecord>();
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        SinkRecord record1 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        SinkRecord record2 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        SinkRecord record3 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        SinkRecord record4 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel1", 42);
+        SinkRecord record5 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        SinkRecord record6 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        SinkRecord record7 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        SinkRecord record8 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        SinkRecord record9 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel2", 42);
+        SinkRecord record10 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        SinkRecord record11 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        SinkRecord record12 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        SinkRecord record13 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        SinkRecord record14 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        records.add(record);
+        records.add(record1);
+        records.add(record2);
+        records.add(record3);
+        records.add(record4);
+        records.add(record5);
+        records.add(record6);
+        records.add(record7);
+        records.add(record8);
+        records.add(record9);
+        records.add(record10);
+        records.add(record11);
+        records.add(record12);
+        records.add(record13);
+        records.add(record14);
+
+        sinkTask.put(records);
+
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+        assertEquals("camel", exchange.getMessage().getBody());
+        assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
+            .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
+        
+        exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+        assertEquals("camel1", exchange.getMessage().getBody());
+        assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
+            .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
+        
+        exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+        assertEquals("camel2", exchange.getMessage().getBody());
+        assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
+            .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
+
+        sinkTask.stop();
+    }
+    
+    @Test
+    public void testWithIdempotencyAndHeader() throws InterruptedException {
+        Map<String, String> props = new HashMap<>();
+        props.put(TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, "true");
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "header");
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, "headerIdempotency");
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
+
+        List<SinkRecord> records = new ArrayList<SinkRecord>();
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        record.headers().add("CamelHeader.headerIdempotency", new SchemaAndValue(Schema.STRING_SCHEMA, "Test"));
+        SinkRecord record1 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        record1.headers().add("CamelHeader.headerIdempotency", new SchemaAndValue(Schema.STRING_SCHEMA, "Test"));
+        SinkRecord record2 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel1", 42);
+        record2.headers().add("CamelHeader.headerIdempotency", new SchemaAndValue(Schema.STRING_SCHEMA, "Test1"));
+
+        records.add(record);
+        records.add(record1);
+        records.add(record2);
+
+        sinkTask.put(records);
+
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+        assertEquals("camel", exchange.getMessage().getBody());
+        assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
+            .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
+        
+        exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+        assertEquals("camel1", exchange.getMessage().getBody());
+        assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
+            .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
+
+        sinkTask.stop();
+    }
 
     @Test
     public void testSecretRaw() {
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index ef393f9..ea7bb1b 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -436,4 +436,103 @@ public class CamelSourceTaskTest {
             sourceTask.stop();
         }
     }
+    
+    @Test
+    public void testSourcePollingWithIdempotencyEnabledAndBody() {
+
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(mapOf(
+            CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
+            CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true,
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body"
+        ));
+
+        try {
+
+        	sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test");
+        	sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test1");
+        	sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test");
+        	sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test2");
+
+            List<SourceRecord> records = sourceTask.poll();
+
+            assertThat(records).hasSize(3);
+            assertThat(records).element(0).hasFieldOrPropertyWithValue("value", "Test");
+            assertThat(records).element(1).hasFieldOrPropertyWithValue("value", "Test1");
+            assertThat(records).element(2).hasFieldOrPropertyWithValue("value", "Test2");
+        } finally {
+            sourceTask.stop();
+        }
+    }
+    
+    @Test
+    public void testSourcePollingWithIdempotencyEnabledAndHeader() {
+
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(mapOf(
+            CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
+            CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true,
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "header",
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, "headerIdempotency"
+        ));
+
+        try {
+
+        	sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test", "headerIdempotency", "Test");
+        	sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test1", "headerIdempotency", "Test1");
+        	sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "TestTest", "headerIdempotency", "Test");
+        	sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test2", "headerIdempotency", "Test2");
+
+            List<SourceRecord> records = sourceTask.poll();
+
+            assertThat(records).hasSize(3);
+            assertThat(records).element(0).hasFieldOrPropertyWithValue("value", "Test");
+            assertThat(records).element(1).hasFieldOrPropertyWithValue("value", "Test1");
+            assertThat(records).element(2).hasFieldOrPropertyWithValue("value", "Test2");
+        } finally {
+            sourceTask.stop();
+        }
+    }
+    
+    @Test
+    public void testSourcePollingWithAggregationAndIdempotencyBySizeAndTimeout() {
+        final int chunkSize = 2;
+
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(mapOf(
+            CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
+            CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true,
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body",
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.StringJoinerAggregator",
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF + ".delimiter", "|",
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, chunkSize
+        ));
+
+        try {
+            assertThat(sourceTask.getCms().getCamelContext().getRegistry().lookupByName(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME))
+                .isInstanceOf(StringJoinerAggregator.class)
+                .hasFieldOrPropertyWithValue("delimiter", "|");
+
+            sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 0);
+            sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 1);
+            sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 2);
+            sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 3);
+            sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 0);
+            sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 1);
+            sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 3);
+            sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 2);
+            
+            List<SourceRecord> records = sourceTask.poll();
+
+            assertThat(records).hasSize(3);
+            assertThat(records).element(0).hasFieldOrPropertyWithValue("value", "0|1");
+            assertThat(records).element(1).hasFieldOrPropertyWithValue("value", "2|3");
+            assertThat(records).element(2).hasFieldOrPropertyWithValue("value", "3|2");
+        } finally {
+            sourceTask.stop();
+        }
+    }
 }