You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/10/08 10:22:44 UTC
[camel-kafka-connector] branch master updated: core: add support
for aggreation on source sinks
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new a0c1c60 core: add support for aggreation on source sinks
a0c1c60 is described below
commit a0c1c6015fe05ce9ab473d826401990811591dd1
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Thu Oct 8 11:03:09 2020 +0200
core: add support for aggreation on source sinks
---
core/pom.xml | 5 ++
.../camel/kafkaconnector/CamelConnectorConfig.java | 49 +++++++++++++
.../kafkaconnector/CamelSinkConnectorConfig.java | 21 +-----
.../apache/camel/kafkaconnector/CamelSinkTask.java | 4 +-
.../kafkaconnector/CamelSourceConnectorConfig.java | 8 +-
.../camel/kafkaconnector/CamelSourceTask.java | 6 +-
.../utils/CamelKafkaConnectMain.java | 3 +-
.../camel/kafkaconnector/CamelSinkTaskTest.java | 10 +--
.../camel/kafkaconnector/CamelSourceTaskTest.java | 85 ++++++++++++++++++++++
.../utils/StringJoinerAggregator.java | 54 ++++++++++++++
10 files changed, 215 insertions(+), 30 deletions(-)
diff --git a/core/pom.xml b/core/pom.xml
index fe05166..61454c1 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -141,6 +141,11 @@
<artifactId>camel-aws2-sqs</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
new file mode 100644
index 0000000..fd62052
--- /dev/null
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+
+public abstract class CamelConnectorConfig extends AbstractConfig {
+ public static final String CAMEL_CONNECTOR_AGGREGATE_DEFAULT = null;
+ public static final String CAMEL_CONNECTOR_AGGREGATE_NAME = "aggregate";
+ public static final String CAMEL_CONNECTOR_AGGREGATE_CONF = "camel.beans." + CAMEL_CONNECTOR_AGGREGATE_NAME;
+ public static final String CAMEL_CONNECTOR_AGGREGATE_DOC = "A reference to an aggregate bean, in the form of #class:";
+
+ public static final Integer CAMEL_CONNECTOR_AGGREGATE_SIZE_DEFAULT = 10;
+ public static final String CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF = "camel.beans.aggregation.size";
+ public static final String CAMEL_CONNECTOR_AGGREGATE_SIZE_DOC = "The size of the aggregation, to be used in combination with camel.beans.aggregate";
+
+ public static final Long CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT = 500L;
+ public static final String CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF = "camel.beans.aggregation.timeout";
+ public static final String CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC = "The timeout of the aggregation, to be used in combination with camel.beans.aggregate";
+
+ protected CamelConnectorConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
+ super(definition, originals, configProviderProps, doLog);
+ }
+
+ protected CamelConnectorConfig(ConfigDef definition, Map<?, ?> originals) {
+ super(definition, originals);
+ }
+
+ protected CamelConnectorConfig(ConfigDef definition, Map<?, ?> originals, boolean doLog) {
+ super(definition, originals, doLog);
+ }
+}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index 9b4b2df..980aad1 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -19,12 +19,11 @@ package org.apache.camel.kafkaconnector;
import java.util.Map;
import org.apache.camel.LoggingLevel;
-import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
-public class CamelSinkConnectorConfig extends AbstractConfig {
+public class CamelSinkConnectorConfig extends CamelConnectorConfig {
public static final String CAMEL_SINK_MARSHAL_DEFAULT = null;
public static final String CAMEL_SINK_MARSHAL_CONF = "camel.sink.marshal";
public static final String CAMEL_SINK_MARSHAL_DOC = "The camel dataformat name to use to marshal data to the destination";
@@ -47,27 +46,15 @@ public class CamelSinkConnectorConfig extends AbstractConfig {
public static final String CAMEL_SINK_CONTENT_LOG_LEVEL_CONF = "camel.sink.contentLogLevel";
public static final String CAMEL_SINK_CONTENT_LOG_LEVEL_DOC = "Log level for the record's content (default: " + CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT + "). Valid values: TRACE, DEBUG, INFO, WARN, ERROR, OFF.";
- public static final String CAMEL_SINK_AGGREGATE_DEFAULT = null;
- public static final String CAMEL_SINK_AGGREGATE_CONF = "camel.beans.aggregate";
- public static final String CAMEL_SINK_AGGREGATE_DOC = "A reference to an aggregate bean, in the form of #class:";
-
- public static final Integer CAMEL_SINK_AGGREGATE_SIZE_DEFAULT = 10;
- public static final String CAMEL_SINK_AGGREGATE_SIZE_CONF = "camel.beans.aggregation.size";
- public static final String CAMEL_SINK_AGGREGATE_SIZE_DOC = "The size of the aggregation, to be used in combination with camel.beans.aggregate";
-
- public static final Long CAMEL_SINK_AGGREGATE_TIMEOUT_DEFAULT = 500L;
- public static final String CAMEL_SINK_AGGREGATE_TIMEOUT_CONF = "camel.beans.aggregation.timeout";
- public static final String CAMEL_SINK_AGGREGATE_TIMEOUT_DOC = "The timeout of the aggregation, to be used in combination with camel.beans.aggregate";
-
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(CAMEL_SINK_URL_CONF, Type.STRING, CAMEL_SINK_URL_DEFAULT, Importance.HIGH, CAMEL_SINK_URL_DOC)
.define(CAMEL_SINK_MARSHAL_CONF, Type.STRING, CAMEL_SINK_MARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_MARSHAL_DOC)
.define(CAMEL_SINK_UNMARSHAL_CONF, Type.STRING, CAMEL_SINK_UNMARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_UNMARSHAL_DOC)
.define(CAMEL_SINK_COMPONENT_CONF, Type.STRING, CAMEL_SINK_COMPONENT_DEFAULT, Importance.HIGH, CAMEL_SINK_COMPONENT_DOC)
.define(CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, Type.STRING, CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, CAMEL_SINK_CONTENT_LOG_LEVEL_DOC)
- .define(CAMEL_SINK_AGGREGATE_CONF, Type.STRING, CAMEL_SINK_AGGREGATE_DEFAULT, Importance.MEDIUM, CAMEL_SINK_AGGREGATE_DOC)
- .define(CAMEL_SINK_AGGREGATE_SIZE_CONF, Type.INT, CAMEL_SINK_AGGREGATE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SINK_AGGREGATE_SIZE_DOC)
- .define(CAMEL_SINK_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_SINK_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_SINK_AGGREGATE_TIMEOUT_DOC);
+ .define(CAMEL_CONNECTOR_AGGREGATE_CONF, Type.STRING, CAMEL_CONNECTOR_AGGREGATE_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_DOC)
+ .define(CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_AGGREGATE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_SIZE_DOC)
+ .define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC);
public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 2a97de4..51cfe88 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -75,8 +75,8 @@ public class CamelSinkTask extends SinkTask {
String remoteUrl = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF);
final String marshaller = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_MARSHAL_CONF);
final String unmarshaller = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_UNMARSHAL_CONF);
- final int size = config.getInt(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_SIZE_CONF);
- final long timeout = config.getLong(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_TIMEOUT_CONF);
+ final int size = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF);
+ final long timeout = config.getLong(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF);
CamelContext camelContext = new DefaultCamelContext();
if (remoteUrl == null) {
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index 92878ae..bdfc5c7 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -19,12 +19,11 @@ package org.apache.camel.kafkaconnector;
import java.util.Map;
import org.apache.camel.LoggingLevel;
-import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
-public class CamelSourceConnectorConfig extends AbstractConfig {
+public class CamelSourceConnectorConfig extends CamelConnectorConfig {
public static final String CAMEL_SOURCE_UNMARSHAL_DEFAULT = null;
public static final String CAMEL_SOURCE_UNMARSHAL_CONF = "camel.source.unmarshal";
public static final String CAMEL_SOURCE_UNMARSHAL_DOC = "The camel dataformat name to use to unmarshal data from the source";
@@ -88,7 +87,10 @@ public class CamelSourceConnectorConfig extends AbstractConfig {
.define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF, Type.BOOLEAN, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC)
.define(CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, Type.STRING, CAMEL_SOURCE_MESSAGE_HEADER_KEY_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC)
.define(CAMEL_SOURCE_COMPONENT_CONF, Type.STRING, CAMEL_SOURCE_COMPONENT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_COMPONENT_DOC)
- .define(CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF, Type.STRING, CAMEL_SOURCE_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_CONTENT_LOG_LEVEL_DOC);
+ .define(CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF, Type.STRING, CAMEL_SOURCE_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_CONTENT_LOG_LEVEL_DOC)
+ .define(CAMEL_CONNECTOR_AGGREGATE_CONF, Type.STRING, CAMEL_CONNECTOR_AGGREGATE_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_DOC)
+ .define(CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_AGGREGATE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_SIZE_DOC)
+ .define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC);
public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 604b9b1..96a7ad4 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -82,6 +82,8 @@ public class CamelSourceTask extends SourceTask {
String remoteUrl = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF);
final String unmarshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_UNMARSHAL_CONF);
final String marshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_MARSHAL_CONF);
+ final int size = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF);
+ final long timeout = config.getLong(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF);
topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(",");
@@ -98,8 +100,8 @@ public class CamelSourceTask extends SourceTask {
.withProperties(actualProps)
.withUnmarshallDataFormat(unmarshaller)
.withMarshallDataFormat(marshaller)
- .withAggregationSize(10)
- .withAggregationTimeout(500)
+ .withAggregationSize(size)
+ .withAggregationTimeout(timeout)
.build(camelContext);
consumer = cms.getCamelContext().getEndpoint(localUrl).createPollingConsumer();
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 4150dea..dcd6398 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -26,6 +26,7 @@ import org.apache.camel.CamelContextAware;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.kafkaconnector.CamelConnectorConfig;
import org.apache.camel.main.BaseMainSupport;
import org.apache.camel.main.MainListener;
import org.apache.camel.model.RouteDefinition;
@@ -204,7 +205,7 @@ public class CamelKafkaConnectMain extends BaseMainSupport {
}
if (getContext().getRegistry().lookupByName("aggregate") != null) {
//aggregation
- AggregationStrategy s = (AggregationStrategy) getContext().getRegistry().lookupByName("aggregate");
+ AggregationStrategy s = getContext().getRegistry().lookupByNameAndType(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME, AggregationStrategy.class);
LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({})", s, aggregationSize, aggregationTimeout);
LOG.info(".to({})", to);
rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(to);
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 3136e98..6e28b0e 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -596,8 +596,8 @@ public class CamelSinkTaskTest {
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
- props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator");
- props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_SIZE_CONF, "5");
+ props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator");
+ props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, "5");
CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);
@@ -631,9 +631,9 @@ public class CamelSinkTaskTest {
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
- props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator");
- props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_SIZE_CONF, "5");
- props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_TIMEOUT_CONF, "100");
+ props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator");
+ props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, "5");
+ props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, "100");
CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 4d268de..4a7b5b1 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -17,18 +17,24 @@
package org.apache.camel.kafkaconnector;
import java.math.BigDecimal;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.camel.LoggingLevel;
import org.apache.camel.ProducerTemplate;
+import org.apache.camel.kafkaconnector.utils.StringJoinerAggregator;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.jupiter.api.Test;
+import static org.apache.camel.util.CollectionHelper.mapOf;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -293,4 +299,83 @@ public class CamelSourceTaskTest {
sourceTask.stop();
}
+
+ @Test
+ public void testSourcePollingWithAggregationBySize() {
+ final int size = 10;
+ final int chunkSize = 5;
+
+ CamelSourceTask sourceTask = new CamelSourceTask();
+ sourceTask.start(mapOf(
+ CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
+ CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
+ CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.StringJoinerAggregator",
+ CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF + ".delimiter", "|",
+ CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, chunkSize
+ ));
+
+ try {
+ assertThat(sourceTask.getCms().getCamelContext().getRegistry().lookupByName(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME))
+ .isInstanceOf(StringJoinerAggregator.class)
+ .hasFieldOrPropertyWithValue("delimiter", "|");
+
+ for (int i = 0; i < size; i++) {
+ sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, Integer.toString(i));
+ }
+
+ List<SourceRecord> records = sourceTask.poll();
+
+ assertThat(records).hasSize(size / chunkSize);
+
+ for (int i = 0; i < size / chunkSize; i++) {
+ assertThat(records)
+ .element(i)
+ .hasFieldOrPropertyWithValue(
+ "value",
+ IntStream.range(i * chunkSize, (i * chunkSize) + chunkSize).mapToObj(Integer::toString).collect(Collectors.joining("|"))
+ );
+ }
+
+ } finally {
+ sourceTask.stop();
+ }
+ }
+
+ @Test
+ public void testSourcePollingWithAggregationBySizeAndTimeout() {
+ final int size = 3;
+ final int chunkSize = 2;
+ final long chunkTimeout = 500L;
+
+ CamelSourceTask sourceTask = new CamelSourceTask();
+ sourceTask.start(mapOf(
+ CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
+ CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
+ CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.StringJoinerAggregator",
+ CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF + ".delimiter", "|",
+ CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, chunkTimeout,
+ CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, chunkSize
+ ));
+
+ try {
+ assertThat(sourceTask.getCms().getCamelContext().getRegistry().lookupByName(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME))
+ .isInstanceOf(StringJoinerAggregator.class)
+ .hasFieldOrPropertyWithValue("delimiter", "|");
+
+ for (int i = 0; i < size; i++) {
+ sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, Integer.toString(i));
+ }
+
+ List<SourceRecord> records = new ArrayList<>();
+ while (records.size() < 2) {
+ records.addAll(sourceTask.poll());
+ }
+
+ assertThat(records).hasSize(2);
+ assertThat(records).element(0).hasFieldOrPropertyWithValue("value", "0|1");
+ assertThat(records).element(1).hasFieldOrPropertyWithValue("value", "2");
+ } finally {
+ sourceTask.stop();
+ }
+ }
}
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/utils/StringJoinerAggregator.java b/core/src/test/java/org/apache/camel/kafkaconnector/utils/StringJoinerAggregator.java
new file mode 100644
index 0000000..f0f455b
--- /dev/null
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/utils/StringJoinerAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.utils;
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+
+public class StringJoinerAggregator implements AggregationStrategy {
+ private String delimiter = ",";
+
+ public String getDelimiter() {
+ return delimiter;
+ }
+
+ public StringJoinerAggregator setDelimiter(String delimiter) {
+ this.delimiter = delimiter;
+ return this;
+ }
+
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ // lets append the old body to the new body
+ if (oldExchange == null) {
+ return newExchange;
+ }
+
+ String body = oldExchange.getIn().getBody(String.class);
+ if (body != null) {
+ Message newIn = newExchange.getIn();
+ String newBody = newIn.getBody(String.class);
+ if (newBody != null) {
+ body += delimiter + newBody;
+ }
+
+ newIn.setBody(body);
+ }
+ return newExchange;
+ }
+}