You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/11/17 20:52:41 UTC
[camel-kafka-connector] 02/02: Fixed CS
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch idempotency
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 8cd6e2cef43d4951f1c2895b5a2c8a1d10ca6e6e
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Tue Nov 17 21:51:54 2020 +0100
Fixed CS
---
.../camel/kafkaconnector/CamelSourceTask.java | 1 +
.../utils/CamelKafkaConnectMain.java | 33 ++++++------
.../camel/kafkaconnector/CamelSinkTaskTest.java | 1 -
.../camel/kafkaconnector/CamelSourceTaskTest.java | 59 +++++++++-------------
4 files changed, 43 insertions(+), 51 deletions(-)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index e32d1c2..4e893ab 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -95,6 +95,7 @@ public class CamelSourceTask extends SourceTask {
final String expressionType = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF);
final String expressionHeader = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF);
final int memoryDimension = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF);
+
topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(",");
String localUrl = getLocalUrlWithPollingOptions(config);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index d0695bd..4954ed9 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -146,23 +146,23 @@ public class CamelKafkaConnectMain extends SimpleMain {
}
public Builder withIdempotencyEnabled(boolean idempotencyEnabled) {
- this.idempotencyEnabled = idempotencyEnabled;
- return this;
+ this.idempotencyEnabled = idempotencyEnabled;
+ return this;
}
public Builder withExpressionType(String expressionType) {
- this.expressionType = expressionType;
- return this;
+ this.expressionType = expressionType;
+ return this;
}
public Builder withExpressionHeader(String expressionHeader) {
- this.expressionHeader = expressionHeader;
- return this;
+ this.expressionHeader = expressionHeader;
+ return this;
}
public Builder withMemoryDimension(int memoryDimension) {
- this.memoryDimension = memoryDimension;
- return this;
+ this.memoryDimension = memoryDimension;
+ return this;
}
public CamelKafkaConnectMain build(CamelContext camelContext) {
@@ -211,14 +211,17 @@ public class CamelKafkaConnectMain extends SimpleMain {
if (idempotencyEnabled) {
switch (expressionType) {
case "body":
- LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
+ LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(body(), + "
+ + "MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
LOG.info(".to({})", to);
- rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+ rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
break;
case "header":
- LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
+ LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(header(expressionHeader), + "
+ + "MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
LOG.info(".to({})", to);
- rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+ rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout)
+ .idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
break;
default:
break;
@@ -232,11 +235,11 @@ public class CamelKafkaConnectMain extends SimpleMain {
if (idempotencyEnabled) {
switch (expressionType) {
case "body":
- LOG.info("idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
- rd.idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+ LOG.info("idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
+ rd.idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
break;
case "header":
- LOG.info("idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
+ LOG.info("idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
rd.idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
break;
default:
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 9472206..96e7eb6 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -32,7 +32,6 @@ import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.Test;
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index ea7bb1b..c6fa7eb 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -441,19 +441,16 @@ public class CamelSourceTaskTest {
public void testSourcePollingWithIdempotencyEnabledAndBody() {
CamelSourceTask sourceTask = new CamelSourceTask();
- sourceTask.start(mapOf(
- CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
- CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
- CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true,
- CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body"
- ));
+ sourceTask.start(mapOf(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME, CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
+ CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true,
+ CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body"));
try {
- sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test");
- sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test1");
- sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test");
- sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test2");
+ sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test");
+ sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test1");
+ sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test");
+ sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test2");
List<SourceRecord> records = sourceTask.poll();
@@ -465,25 +462,22 @@ public class CamelSourceTaskTest {
sourceTask.stop();
}
}
-
+
@Test
public void testSourcePollingWithIdempotencyEnabledAndHeader() {
CamelSourceTask sourceTask = new CamelSourceTask();
- sourceTask.start(mapOf(
- CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
- CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
- CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true,
- CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "header",
- CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, "headerIdempotency"
- ));
+ sourceTask
+ .start(mapOf(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME, CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
+ CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true, CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF,
+ "header", CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, "headerIdempotency"));
try {
- sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test", "headerIdempotency", "Test");
- sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test1", "headerIdempotency", "Test1");
- sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "TestTest", "headerIdempotency", "Test");
- sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test2", "headerIdempotency", "Test2");
+ sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test", "headerIdempotency", "Test");
+ sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test1", "headerIdempotency", "Test1");
+ sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "TestTest", "headerIdempotency", "Test");
+ sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test2", "headerIdempotency", "Test2");
List<SourceRecord> records = sourceTask.poll();
@@ -495,26 +489,21 @@ public class CamelSourceTaskTest {
sourceTask.stop();
}
}
-
+
@Test
public void testSourcePollingWithAggregationAndIdempotencyBySizeAndTimeout() {
final int chunkSize = 2;
CamelSourceTask sourceTask = new CamelSourceTask();
- sourceTask.start(mapOf(
- CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
- CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
- CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true,
- CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body",
- CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.StringJoinerAggregator",
- CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF + ".delimiter", "|",
- CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, chunkSize
- ));
+ sourceTask
+ .start(mapOf(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME, CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
+ CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true, CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF,
+ "body", CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.StringJoinerAggregator",
+ CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF + ".delimiter", "|", CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, chunkSize));
try {
assertThat(sourceTask.getCms().getCamelContext().getRegistry().lookupByName(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME))
- .isInstanceOf(StringJoinerAggregator.class)
- .hasFieldOrPropertyWithValue("delimiter", "|");
+ .isInstanceOf(StringJoinerAggregator.class).hasFieldOrPropertyWithValue("delimiter", "|");
sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 0);
sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 1);
@@ -524,7 +513,7 @@ public class CamelSourceTaskTest {
sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 1);
sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 3);
sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 2);
-
+
List<SourceRecord> records = sourceTask.poll();
assertThat(records).hasSize(3);