You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/10/08 10:22:44 UTC

[camel-kafka-connector] branch master updated: core: add support for aggreation on source sinks

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new a0c1c60  core: add support for aggreation on source sinks
a0c1c60 is described below

commit a0c1c6015fe05ce9ab473d826401990811591dd1
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Thu Oct 8 11:03:09 2020 +0200

    core: add support for aggreation on source sinks
---
 core/pom.xml                                       |  5 ++
 .../camel/kafkaconnector/CamelConnectorConfig.java | 49 +++++++++++++
 .../kafkaconnector/CamelSinkConnectorConfig.java   | 21 +-----
 .../apache/camel/kafkaconnector/CamelSinkTask.java |  4 +-
 .../kafkaconnector/CamelSourceConnectorConfig.java |  8 +-
 .../camel/kafkaconnector/CamelSourceTask.java      |  6 +-
 .../utils/CamelKafkaConnectMain.java               |  3 +-
 .../camel/kafkaconnector/CamelSinkTaskTest.java    | 10 +--
 .../camel/kafkaconnector/CamelSourceTaskTest.java  | 85 ++++++++++++++++++++++
 .../utils/StringJoinerAggregator.java              | 54 ++++++++++++++
 10 files changed, 215 insertions(+), 30 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index fe05166..61454c1 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -141,6 +141,11 @@
             <artifactId>camel-aws2-sqs</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
new file mode 100644
index 0000000..fd62052
--- /dev/null
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+
+public abstract class CamelConnectorConfig extends AbstractConfig {
+    public static final String CAMEL_CONNECTOR_AGGREGATE_DEFAULT = null;
+    public static final String CAMEL_CONNECTOR_AGGREGATE_NAME = "aggregate";
+    public static final String CAMEL_CONNECTOR_AGGREGATE_CONF = "camel.beans." + CAMEL_CONNECTOR_AGGREGATE_NAME;
+    public static final String CAMEL_CONNECTOR_AGGREGATE_DOC = "A reference to an aggregate bean, in the form of #class:";
+
+    public static final Integer CAMEL_CONNECTOR_AGGREGATE_SIZE_DEFAULT = 10;
+    public static final String CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF = "camel.beans.aggregation.size";
+    public static final String CAMEL_CONNECTOR_AGGREGATE_SIZE_DOC = "The size of the aggregation, to be used in combination with camel.beans.aggregate";
+
+    public static final Long CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT = 500L;
+    public static final String CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF = "camel.beans.aggregation.timeout";
+    public static final String CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC = "The timeout of the aggregation, to be used in combination with camel.beans.aggregate";
+
+    protected CamelConnectorConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
+        super(definition, originals, configProviderProps, doLog);
+    }
+
+    protected CamelConnectorConfig(ConfigDef definition, Map<?, ?> originals) {
+        super(definition, originals);
+    }
+
+    protected CamelConnectorConfig(ConfigDef definition, Map<?, ?> originals, boolean doLog) {
+        super(definition, originals, doLog);
+    }
+}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index 9b4b2df..980aad1 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -19,12 +19,11 @@ package org.apache.camel.kafkaconnector;
 import java.util.Map;
 
 import org.apache.camel.LoggingLevel;
-import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
 
-public class CamelSinkConnectorConfig extends AbstractConfig {
+public class CamelSinkConnectorConfig extends CamelConnectorConfig {
     public static final String CAMEL_SINK_MARSHAL_DEFAULT = null;
     public static final String CAMEL_SINK_MARSHAL_CONF = "camel.sink.marshal";
     public static final String CAMEL_SINK_MARSHAL_DOC = "The camel dataformat name to use to marshal data to the destination";
@@ -47,27 +46,15 @@ public class CamelSinkConnectorConfig extends AbstractConfig {
     public static final String CAMEL_SINK_CONTENT_LOG_LEVEL_CONF = "camel.sink.contentLogLevel";
     public static final String CAMEL_SINK_CONTENT_LOG_LEVEL_DOC = "Log level for the record's content (default: " + CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT + "). Valid values: TRACE, DEBUG, INFO, WARN, ERROR, OFF.";
 
-    public static final String CAMEL_SINK_AGGREGATE_DEFAULT = null;
-    public static final String CAMEL_SINK_AGGREGATE_CONF = "camel.beans.aggregate";
-    public static final String CAMEL_SINK_AGGREGATE_DOC = "A reference to an aggregate bean, in the form of #class:";    
-
-    public static final Integer CAMEL_SINK_AGGREGATE_SIZE_DEFAULT = 10;
-    public static final String CAMEL_SINK_AGGREGATE_SIZE_CONF = "camel.beans.aggregation.size";
-    public static final String CAMEL_SINK_AGGREGATE_SIZE_DOC = "The size of the aggregation, to be used in combination with camel.beans.aggregate";
-    
-    public static final Long CAMEL_SINK_AGGREGATE_TIMEOUT_DEFAULT = 500L;
-    public static final String CAMEL_SINK_AGGREGATE_TIMEOUT_CONF = "camel.beans.aggregation.timeout";
-    public static final String CAMEL_SINK_AGGREGATE_TIMEOUT_DOC = "The timeout of the aggregation, to be used in combination with camel.beans.aggregate";    
-
     private static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(CAMEL_SINK_URL_CONF, Type.STRING, CAMEL_SINK_URL_DEFAULT, Importance.HIGH, CAMEL_SINK_URL_DOC)
         .define(CAMEL_SINK_MARSHAL_CONF, Type.STRING, CAMEL_SINK_MARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_MARSHAL_DOC)
         .define(CAMEL_SINK_UNMARSHAL_CONF, Type.STRING, CAMEL_SINK_UNMARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_UNMARSHAL_DOC)
         .define(CAMEL_SINK_COMPONENT_CONF, Type.STRING, CAMEL_SINK_COMPONENT_DEFAULT, Importance.HIGH, CAMEL_SINK_COMPONENT_DOC)
         .define(CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, Type.STRING, CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, CAMEL_SINK_CONTENT_LOG_LEVEL_DOC)
-        .define(CAMEL_SINK_AGGREGATE_CONF, Type.STRING, CAMEL_SINK_AGGREGATE_DEFAULT, Importance.MEDIUM, CAMEL_SINK_AGGREGATE_DOC)
-        .define(CAMEL_SINK_AGGREGATE_SIZE_CONF, Type.INT, CAMEL_SINK_AGGREGATE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SINK_AGGREGATE_SIZE_DOC)
-        .define(CAMEL_SINK_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_SINK_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_SINK_AGGREGATE_TIMEOUT_DOC);
+        .define(CAMEL_CONNECTOR_AGGREGATE_CONF, Type.STRING, CAMEL_CONNECTOR_AGGREGATE_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_DOC)
+        .define(CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_AGGREGATE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_SIZE_DOC)
+        .define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC);
 
     public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
         super(config, parsedConfig);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 2a97de4..51cfe88 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -75,8 +75,8 @@ public class CamelSinkTask extends SinkTask {
             String remoteUrl = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF);
             final String marshaller = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_MARSHAL_CONF);
             final String unmarshaller = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_UNMARSHAL_CONF);
-            final int size = config.getInt(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_SIZE_CONF);
-            final long timeout = config.getLong(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_TIMEOUT_CONF);
+            final int size = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF);
+            final long timeout = config.getLong(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF);
 
             CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index 92878ae..bdfc5c7 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -19,12 +19,11 @@ package org.apache.camel.kafkaconnector;
 import java.util.Map;
 
 import org.apache.camel.LoggingLevel;
-import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
 
-public class CamelSourceConnectorConfig extends AbstractConfig {
+public class CamelSourceConnectorConfig extends CamelConnectorConfig {
     public static final String CAMEL_SOURCE_UNMARSHAL_DEFAULT = null;
     public static final String CAMEL_SOURCE_UNMARSHAL_CONF = "camel.source.unmarshal";
     public static final String CAMEL_SOURCE_UNMARSHAL_DOC = "The camel dataformat name to use to unmarshal data from the source";
@@ -88,7 +87,10 @@ public class CamelSourceConnectorConfig extends AbstractConfig {
         .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF, Type.BOOLEAN, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC)
         .define(CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, Type.STRING, CAMEL_SOURCE_MESSAGE_HEADER_KEY_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC)
         .define(CAMEL_SOURCE_COMPONENT_CONF, Type.STRING, CAMEL_SOURCE_COMPONENT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_COMPONENT_DOC)
-        .define(CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF, Type.STRING, CAMEL_SOURCE_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_CONTENT_LOG_LEVEL_DOC);
+        .define(CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF, Type.STRING, CAMEL_SOURCE_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_CONTENT_LOG_LEVEL_DOC)
+        .define(CAMEL_CONNECTOR_AGGREGATE_CONF, Type.STRING, CAMEL_CONNECTOR_AGGREGATE_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_DOC)
+        .define(CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_AGGREGATE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_SIZE_DOC)
+        .define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC);
 
     public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
         super(config, parsedConfig);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 604b9b1..96a7ad4 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -82,6 +82,8 @@ public class CamelSourceTask extends SourceTask {
             String remoteUrl = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF);
             final String unmarshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_UNMARSHAL_CONF);
             final String marshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_MARSHAL_CONF);
+            final int size = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF);
+            final long timeout = config.getLong(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF);
 
             topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(",");
 
@@ -98,8 +100,8 @@ public class CamelSourceTask extends SourceTask {
                 .withProperties(actualProps)
                 .withUnmarshallDataFormat(unmarshaller)
                 .withMarshallDataFormat(marshaller)
-                .withAggregationSize(10)
-                .withAggregationTimeout(500)
+                .withAggregationSize(size)
+                .withAggregationTimeout(timeout)
                 .build(camelContext);
 
             consumer = cms.getCamelContext().getEndpoint(localUrl).createPollingConsumer();
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 4150dea..dcd6398 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -26,6 +26,7 @@ import org.apache.camel.CamelContextAware;
 import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.kafkaconnector.CamelConnectorConfig;
 import org.apache.camel.main.BaseMainSupport;
 import org.apache.camel.main.MainListener;
 import org.apache.camel.model.RouteDefinition;
@@ -204,7 +205,7 @@ public class CamelKafkaConnectMain extends BaseMainSupport {
                     }
                     if (getContext().getRegistry().lookupByName("aggregate") != null) {
                         //aggregation
-                        AggregationStrategy s = (AggregationStrategy) getContext().getRegistry().lookupByName("aggregate");
+                        AggregationStrategy s = getContext().getRegistry().lookupByNameAndType(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME, AggregationStrategy.class);
                         LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({})", s, aggregationSize, aggregationTimeout);
                         LOG.info(".to({})", to);
                         rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(to);
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 3136e98..6e28b0e 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -596,8 +596,8 @@ public class CamelSinkTaskTest {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
-        props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator");
-        props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_SIZE_CONF, "5");
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator");
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, "5");
         CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
@@ -631,9 +631,9 @@ public class CamelSinkTaskTest {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
-        props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator");
-        props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_SIZE_CONF, "5");
-        props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_TIMEOUT_CONF, "100");
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator");
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, "5");
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, "100");
         CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 4d268de..4a7b5b1 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -17,18 +17,24 @@
 package org.apache.camel.kafkaconnector;
 
 import java.math.BigDecimal;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.ProducerTemplate;
+import org.apache.camel.kafkaconnector.utils.StringJoinerAggregator;
 import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.junit.jupiter.api.Test;
 
+import static org.apache.camel.util.CollectionHelper.mapOf;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -293,4 +299,83 @@ public class CamelSourceTaskTest {
 
         sourceTask.stop();
     }
+
+    @Test
+    public void testSourcePollingWithAggregationBySize() {
+        final int size = 10;
+        final int chunkSize = 5;
+
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(mapOf(
+            CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
+            CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.StringJoinerAggregator",
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF + ".delimiter", "|",
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, chunkSize
+        ));
+
+        try {
+            assertThat(sourceTask.getCms().getCamelContext().getRegistry().lookupByName(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME))
+                .isInstanceOf(StringJoinerAggregator.class)
+                .hasFieldOrPropertyWithValue("delimiter", "|");
+
+            for (int i = 0; i < size; i++) {
+                sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI,  Integer.toString(i));
+            }
+
+            List<SourceRecord> records = sourceTask.poll();
+
+            assertThat(records).hasSize(size / chunkSize);
+
+            for (int i = 0; i < size / chunkSize; i++) {
+                assertThat(records)
+                    .element(i)
+                    .hasFieldOrPropertyWithValue(
+                        "value",
+                        IntStream.range(i * chunkSize, (i * chunkSize) + chunkSize).mapToObj(Integer::toString).collect(Collectors.joining("|"))
+                    );
+            }
+
+        } finally {
+            sourceTask.stop();
+        }
+    }
+
+    @Test
+    public void testSourcePollingWithAggregationBySizeAndTimeout() {
+        final int size = 3;
+        final int chunkSize = 2;
+        final long chunkTimeout = 500L;
+
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(mapOf(
+            CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
+            CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.StringJoinerAggregator",
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF + ".delimiter", "|",
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, chunkTimeout,
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, chunkSize
+        ));
+
+        try {
+            assertThat(sourceTask.getCms().getCamelContext().getRegistry().lookupByName(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME))
+                .isInstanceOf(StringJoinerAggregator.class)
+                .hasFieldOrPropertyWithValue("delimiter", "|");
+
+            for (int i = 0; i < size; i++) {
+                sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, Integer.toString(i));
+            }
+
+            List<SourceRecord> records = new ArrayList<>();
+            while (records.size() < 2) {
+                records.addAll(sourceTask.poll());
+            }
+
+            assertThat(records).hasSize(2);
+            assertThat(records).element(0).hasFieldOrPropertyWithValue("value", "0|1");
+            assertThat(records).element(1).hasFieldOrPropertyWithValue("value", "2");
+        } finally {
+            sourceTask.stop();
+        }
+    }
 }
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/utils/StringJoinerAggregator.java b/core/src/test/java/org/apache/camel/kafkaconnector/utils/StringJoinerAggregator.java
new file mode 100644
index 0000000..f0f455b
--- /dev/null
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/utils/StringJoinerAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.utils;
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+
+public class StringJoinerAggregator implements AggregationStrategy {
+    private String delimiter = ",";
+
+    public String getDelimiter() {
+        return delimiter;
+    }
+
+    public StringJoinerAggregator setDelimiter(String delimiter) {
+        this.delimiter = delimiter;
+        return this;
+    }
+
+    @Override
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        // lets append the old body to the new body
+        if (oldExchange == null) {
+            return newExchange;
+        }
+
+        String body = oldExchange.getIn().getBody(String.class);
+        if (body != null) {
+            Message newIn = newExchange.getIn();
+            String newBody = newIn.getBody(String.class);
+            if (newBody != null) {
+                body += delimiter + newBody;
+            }
+
+            newIn.setBody(body);
+        }
+        return newExchange;
+    }
+}