You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/11/24 08:19:25 UTC
[camel-kafka-connector] 03/18: Added Memory Idempotency support for
both sink and source
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 5465d13ef8f012a389123e3523c471e6c40cacc7
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Tue Nov 17 19:40:15 2020 +0100
Added Memory Idempotency support for both sink and source
---
.../camel/kafkaconnector/CamelConnectorConfig.java | 16 ++
.../kafkaconnector/CamelSinkConnectorConfig.java | 8 +-
.../apache/camel/kafkaconnector/CamelSinkTask.java | 9 ++
.../kafkaconnector/CamelSourceConnectorConfig.java | 8 +-
.../camel/kafkaconnector/CamelSourceTask.java | 9 +-
.../utils/CamelKafkaConnectMain.java | 69 ++++++++-
.../camel/kafkaconnector/CamelSinkTaskTest.java | 172 +++++++++++++++++++++
.../camel/kafkaconnector/CamelSourceTaskTest.java | 99 ++++++++++++
8 files changed, 379 insertions(+), 11 deletions(-)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
index 4271938..0332576 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
@@ -47,6 +47,22 @@ public abstract class CamelConnectorConfig extends AbstractConfig {
public static final String CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF = "camel.error.handler.redelivery.delay";
public static final String CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC = "The initial redelivery delay in milliseconds in case of Default Error Handler";
+ public static final Boolean CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DEFAULT = false;
+ public static final String CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF = "camel.idempotency.enabled";
+ public static final String CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DOC = "If in memory idempotency must be enabled or not";
+
+ public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DEFAULT = "body";
+ public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF = "camel.idempotency.expression.type";
+ public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DOC = "How the idempotency will be evaluated: possible values are body and header";
+
+ public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DEFAULT = null;
+ public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF = "camel.idempotency.expression.header";
+ public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DOC = "The header name that will be evaluated in case of camel.idempotency.expression.type equals to header";
+
+ public static final int CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT = 100;
+ public static final String CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF = "camel.idempotency.memory.dimension";
+ public static final String CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC = "The Memory dimension of the in memory idempotent Repository";
+
protected CamelConnectorConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
super(definition, originals, configProviderProps, doLog);
}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index e86e921..299d578 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -57,8 +57,12 @@ public class CamelSinkConnectorConfig extends CamelConnectorConfig {
.define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC)
.define(CAMEL_CONNECTOR_ERROR_HANDLER_CONF, Type.STRING, CAMEL_CONNECTOR_ERROR_HANDLER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_ERROR_HANDLER_DOC)
.define(CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF, Type.INT, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DOC)
- .define(CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF, Type.LONG, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC);
-
+ .define(CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF, Type.LONG, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC)
+ .define(CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DOC)
+ .define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DOC)
+ .define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DOC)
+ .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC);
+
public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index a38afea..2cc01a2 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -85,6 +85,11 @@ public class CamelSinkTask extends SinkTask {
final int maxRedeliveries = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF);
final long redeliveryDelay = config.getLong(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF);
final String errorHandler = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF);
+ final Boolean idempotencyEnabled = config.getBoolean(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF);
+ final String expressionType = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF);
+ final String expressionHeader = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF);
+ final int memoryDimension = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF);
+
CamelContext camelContext = new DefaultCamelContext();
if (remoteUrl == null) {
remoteUrl = TaskHelper.buildUrl(camelContext,
@@ -103,6 +108,10 @@ public class CamelSinkTask extends SinkTask {
.withErrorHandler(errorHandler)
.withMaxRedeliveries(maxRedeliveries)
.withRedeliveryDelay(redeliveryDelay)
+ .withIdempotencyEnabled(idempotencyEnabled)
+ .withExpressionType(expressionType)
+ .withExpressionHeader(expressionHeader)
+ .withMemoryDimension(memoryDimension)
.build(camelContext);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index 2ecfb45..70e2b09 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -93,8 +93,12 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig {
.define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC)
.define(CAMEL_CONNECTOR_ERROR_HANDLER_CONF, Type.STRING, CAMEL_CONNECTOR_ERROR_HANDLER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_ERROR_HANDLER_DOC)
.define(CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF, Type.INT, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DOC)
- .define(CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF, Type.LONG, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC);
-
+ .define(CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF, Type.LONG, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC)
+ .define(CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DOC)
+ .define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DOC)
+ .define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DOC)
+ .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC);
+
public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 165ffff..e32d1c2 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -91,7 +91,10 @@ public class CamelSourceTask extends SourceTask {
final int maxRedeliveries = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF);
final long redeliveryDelay = config.getLong(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF);
final String errorHandler = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF);
-
+ final Boolean idempotencyEnabled = config.getBoolean(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF);
+ final String expressionType = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF);
+ final String expressionHeader = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF);
+ final int memoryDimension = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF);
topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(",");
String localUrl = getLocalUrlWithPollingOptions(config);
@@ -113,6 +116,10 @@ public class CamelSourceTask extends SourceTask {
.withErrorHandler(errorHandler)
.withMaxRedeliveries(maxRedeliveries)
.withRedeliveryDelay(redeliveryDelay)
+ .withIdempotencyEnabled(idempotencyEnabled)
+ .withExpressionType(expressionType)
+ .withExpressionHeader(expressionHeader)
+ .withMemoryDimension(memoryDimension)
.build(camelContext);
consumer = cms.getCamelContext().getEndpoint(localUrl).createPollingConsumer();
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 7a43f0f..d0695bd 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -28,6 +28,7 @@ import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.kafkaconnector.CamelConnectorConfig;
import org.apache.camel.main.SimpleMain;
import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
@@ -94,6 +95,10 @@ public class CamelKafkaConnectMain extends SimpleMain {
private String errorHandler;
private int maxRedeliveries;
private long redeliveryDelay;
+ private boolean idempotencyEnabled;
+ private String expressionType;
+ private String expressionHeader;
+ private int memoryDimension;
public Builder(String from, String to) {
this.from = from;
@@ -139,6 +144,26 @@ public class CamelKafkaConnectMain extends SimpleMain {
this.redeliveryDelay = redeliveryDelay;
return this;
}
+
+ public Builder withIdempotencyEnabled(boolean idempotencyEnabled) {
+ this.idempotencyEnabled = idempotencyEnabled;
+ return this;
+ }
+
+ public Builder withExpressionType(String expressionType) {
+ this.expressionType = expressionType;
+ return this;
+ }
+
+ public Builder withExpressionHeader(String expressionHeader) {
+ this.expressionHeader = expressionHeader;
+ return this;
+ }
+
+ public Builder withMemoryDimension(int memoryDimension) {
+ this.memoryDimension = memoryDimension;
+ return this;
+ }
public CamelKafkaConnectMain build(CamelContext camelContext) {
CamelKafkaConnectMain camelMain = new CamelKafkaConnectMain(camelContext);
@@ -183,13 +208,45 @@ public class CamelKafkaConnectMain extends SimpleMain {
if (getContext().getRegistry().lookupByName("aggregate") != null) {
//aggregation
AggregationStrategy s = getContext().getRegistry().lookupByNameAndType(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME, AggregationStrategy.class);
- LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({})", s, aggregationSize, aggregationTimeout);
- LOG.info(".to({})", to);
- rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(to);
+ if (idempotencyEnabled) {
+ switch (expressionType) {
+ case "body":
+ LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
+ LOG.info(".to({})", to);
+ rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+ break;
+ case "header":
+ LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
+ LOG.info(".to({})", to);
+ rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+ break;
+ default:
+ break;
+ }
+ } else {
+ LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({})", s, aggregationSize, aggregationTimeout);
+ LOG.info(".to({})", to);
+ rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(to);
+ }
} else {
- //to
- LOG.info(".to({})", to);
- rd.toD(to);
+ if (idempotencyEnabled) {
+ switch (expressionType) {
+ case "body":
+ LOG.info("idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
+ rd.idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+ break;
+ case "header":
+ LOG.info("idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
+ rd.idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+ break;
+ default:
+ break;
+ }
+ } else {
+ //to
+ LOG.info(".to({})", to);
+ rd.toD(to);
+ }
}
}
});
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 6559456..9472206 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -29,8 +29,10 @@ import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.Test;
@@ -663,6 +665,176 @@ public class CamelSinkTaskTest {
sinkTask.stop();
}
+
+ @Test
+ public void testAggregationWithIdempotencyBodyAndTimeout() throws InterruptedException {
+ Map<String, String> props = new HashMap<>();
+ props.put(TOPIC_CONF, TOPIC_NAME);
+ props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+ props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator");
+ props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, "5");
+ props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, "100");
+ props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, "true");
+ props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body");
+ CamelSinkTask sinkTask = new CamelSinkTask();
+ sinkTask.start(props);
+
+ List<SinkRecord> records = new ArrayList<SinkRecord>();
+ SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+ SinkRecord record1 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel1", 42);
+ SinkRecord record2 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel2", 42);
+ SinkRecord record3 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel3", 42);
+ SinkRecord record4 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel4", 42);
+ SinkRecord record5 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+ SinkRecord record6 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel1", 42);
+ SinkRecord record7 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel2", 42);
+ SinkRecord record8 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel3", 42);
+ SinkRecord record9 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel4", 42);
+ SinkRecord record10 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel5", 42);
+ SinkRecord record11 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel6", 42);
+ SinkRecord record12 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel7", 42);
+ SinkRecord record13 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel8", 42);
+ SinkRecord record14 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel9", 42);
+ records.add(record);
+ records.add(record1);
+ records.add(record2);
+ records.add(record3);
+ records.add(record4);
+ records.add(record5);
+ records.add(record6);
+ records.add(record7);
+ records.add(record8);
+ records.add(record9);
+ records.add(record10);
+ records.add(record11);
+ records.add(record12);
+ records.add(record13);
+ records.add(record14);
+
+ sinkTask.put(records);
+
+ ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
+ Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+ assertEquals("camel camel1 camel2 camel3 camel4", exchange.getMessage().getBody());
+ assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+ assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
+ .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
+
+ exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+ assertEquals("camel5 camel6 camel7 camel8 camel9", exchange.getMessage().getBody());
+ assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+ assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
+ .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
+
+ sinkTask.stop();
+ }
+
+ @Test
+ public void testWithIdempotency() throws InterruptedException {
+ Map<String, String> props = new HashMap<>();
+ props.put(TOPIC_CONF, TOPIC_NAME);
+ props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+ props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, "true");
+ props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body");
+ CamelSinkTask sinkTask = new CamelSinkTask();
+ sinkTask.start(props);
+
+ List<SinkRecord> records = new ArrayList<SinkRecord>();
+ SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+ SinkRecord record1 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+ SinkRecord record2 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+ SinkRecord record3 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+ SinkRecord record4 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel1", 42);
+ SinkRecord record5 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+ SinkRecord record6 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+ SinkRecord record7 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+ SinkRecord record8 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+ SinkRecord record9 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel2", 42);
+ SinkRecord record10 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+ SinkRecord record11 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+ SinkRecord record12 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+ SinkRecord record13 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+ SinkRecord record14 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+ records.add(record);
+ records.add(record1);
+ records.add(record2);
+ records.add(record3);
+ records.add(record4);
+ records.add(record5);
+ records.add(record6);
+ records.add(record7);
+ records.add(record8);
+ records.add(record9);
+ records.add(record10);
+ records.add(record11);
+ records.add(record12);
+ records.add(record13);
+ records.add(record14);
+
+ sinkTask.put(records);
+
+ ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
+ Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+ assertEquals("camel", exchange.getMessage().getBody());
+ assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+ assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
+ .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
+
+ exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+ assertEquals("camel1", exchange.getMessage().getBody());
+ assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+ assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
+ .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
+
+ exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+ assertEquals("camel2", exchange.getMessage().getBody());
+ assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+ assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
+ .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
+
+ sinkTask.stop();
+ }
+
+ @Test
+ public void testWithIdempotencyAndHeader() throws InterruptedException {
+ Map<String, String> props = new HashMap<>();
+ props.put(TOPIC_CONF, TOPIC_NAME);
+ props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+ props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, "true");
+ props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "header");
+ props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, "headerIdempotency");
+ CamelSinkTask sinkTask = new CamelSinkTask();
+ sinkTask.start(props);
+
+ List<SinkRecord> records = new ArrayList<SinkRecord>();
+ SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+ record.headers().add("CamelHeader.headerIdempotency", new SchemaAndValue(Schema.STRING_SCHEMA, "Test"));
+ SinkRecord record1 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+ record1.headers().add("CamelHeader.headerIdempotency", new SchemaAndValue(Schema.STRING_SCHEMA, "Test"));
+ SinkRecord record2 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel1", 42);
+ record2.headers().add("CamelHeader.headerIdempotency", new SchemaAndValue(Schema.STRING_SCHEMA, "Test1"));
+
+ records.add(record);
+ records.add(record1);
+ records.add(record2);
+
+ sinkTask.put(records);
+
+ ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
+ Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+ assertEquals("camel", exchange.getMessage().getBody());
+ assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+ assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
+ .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
+
+ exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+ assertEquals("camel1", exchange.getMessage().getBody());
+ assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+ assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
+ .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
+
+ sinkTask.stop();
+ }
@Test
public void testSecretRaw() {
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index ef393f9..ea7bb1b 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -436,4 +436,103 @@ public class CamelSourceTaskTest {
sourceTask.stop();
}
}
+
+ @Test
+ public void testSourcePollingWithIdempotencyEnabledAndBody() {
+
+ CamelSourceTask sourceTask = new CamelSourceTask();
+ sourceTask.start(mapOf(
+ CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
+ CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
+ CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true,
+ CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body"
+ ));
+
+ try {
+
+ sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test");
+ sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test1");
+ sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test");
+ sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test2");
+
+ List<SourceRecord> records = sourceTask.poll();
+
+ assertThat(records).hasSize(3);
+ assertThat(records).element(0).hasFieldOrPropertyWithValue("value", "Test");
+ assertThat(records).element(1).hasFieldOrPropertyWithValue("value", "Test1");
+ assertThat(records).element(2).hasFieldOrPropertyWithValue("value", "Test2");
+ } finally {
+ sourceTask.stop();
+ }
+ }
+
+ @Test
+ public void testSourcePollingWithIdempotencyEnabledAndHeader() {
+
+ CamelSourceTask sourceTask = new CamelSourceTask();
+ sourceTask.start(mapOf(
+ CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
+ CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
+ CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true,
+ CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "header",
+ CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, "headerIdempotency"
+ ));
+
+ try {
+
+ sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test", "headerIdempotency", "Test");
+ sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test1", "headerIdempotency", "Test1");
+ sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "TestTest", "headerIdempotency", "Test");
+ sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test2", "headerIdempotency", "Test2");
+
+ List<SourceRecord> records = sourceTask.poll();
+
+ assertThat(records).hasSize(3);
+ assertThat(records).element(0).hasFieldOrPropertyWithValue("value", "Test");
+ assertThat(records).element(1).hasFieldOrPropertyWithValue("value", "Test1");
+ assertThat(records).element(2).hasFieldOrPropertyWithValue("value", "Test2");
+ } finally {
+ sourceTask.stop();
+ }
+ }
+
+ @Test
+ public void testSourcePollingWithAggregationAndIdempotencyBySizeAndTimeout() {
+ final int chunkSize = 2;
+
+ CamelSourceTask sourceTask = new CamelSourceTask();
+ sourceTask.start(mapOf(
+ CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
+ CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
+ CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true,
+ CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body",
+ CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.StringJoinerAggregator",
+ CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF + ".delimiter", "|",
+ CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, chunkSize
+ ));
+
+ try {
+ assertThat(sourceTask.getCms().getCamelContext().getRegistry().lookupByName(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME))
+ .isInstanceOf(StringJoinerAggregator.class)
+ .hasFieldOrPropertyWithValue("delimiter", "|");
+
+ sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 0);
+ sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 1);
+ sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 2);
+ sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 3);
+ sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 0);
+ sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 1);
+ sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 3);
+ sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 2);
+
+ List<SourceRecord> records = sourceTask.poll();
+
+ assertThat(records).hasSize(3);
+ assertThat(records).element(0).hasFieldOrPropertyWithValue("value", "0|1");
+ assertThat(records).element(1).hasFieldOrPropertyWithValue("value", "2|3");
+ assertThat(records).element(2).hasFieldOrPropertyWithValue("value", "3|2");
+ } finally {
+ sourceTask.stop();
+ }
+ }
}