You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/11/24 06:48:23 UTC

[camel-kafka-connector] 04/17: Fixed CS

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-master-align-and-rebase
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit a452db9047904007b84cf7a076e6b16f15d572e2
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Tue Nov 17 21:51:54 2020 +0100

    Fixed CS
---
 .../camel/kafkaconnector/CamelSourceTask.java      |  1 +
 .../utils/CamelKafkaConnectMain.java               | 33 ++++++------
 .../camel/kafkaconnector/CamelSinkTaskTest.java    |  1 -
 .../camel/kafkaconnector/CamelSourceTaskTest.java  | 59 +++++++++-------------
 4 files changed, 43 insertions(+), 51 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index e32d1c2..4e893ab 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -95,6 +95,7 @@ public class CamelSourceTask extends SourceTask {
             final String expressionType = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF);
             final String expressionHeader = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF);
             final int memoryDimension = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF);
+            
             topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(",");
 
             String localUrl = getLocalUrlWithPollingOptions(config);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index d0695bd..4954ed9 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -146,23 +146,23 @@ public class CamelKafkaConnectMain extends SimpleMain {
         }
         
         public Builder withIdempotencyEnabled(boolean idempotencyEnabled) {
-        	this.idempotencyEnabled = idempotencyEnabled;
-        	return this;
+            this.idempotencyEnabled = idempotencyEnabled;
+            return this;
         }
         
         public Builder withExpressionType(String expressionType) {
-        	this.expressionType = expressionType;
-        	return this;
+            this.expressionType = expressionType;
+            return this;
         }
         
         public Builder withExpressionHeader(String expressionHeader) {
-        	this.expressionHeader = expressionHeader;
-        	return this;
+            this.expressionHeader = expressionHeader;
+            return this;
         }
         
         public Builder withMemoryDimension(int memoryDimension) {
-        	this.memoryDimension = memoryDimension;
-        	return this;
+            this.memoryDimension = memoryDimension;
+            return this;
         }
 
         public CamelKafkaConnectMain build(CamelContext camelContext) {
@@ -211,14 +211,17 @@ public class CamelKafkaConnectMain extends SimpleMain {
                         if (idempotencyEnabled) {
                             switch (expressionType) {
                                 case "body":
-                                    LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
+                                    LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(body(), + "
+                                           + "MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
                                     LOG.info(".to({})", to);
-                                	rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+                                    rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
                                     break;
                                 case "header":
-                                    LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
+                                    LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(header(expressionHeader), + "
+                                           + "MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
                                     LOG.info(".to({})", to);
-                                    rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+                                    rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout)
+                                        .idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
                                     break;
                                 default:
                                     break;
@@ -232,11 +235,11 @@ public class CamelKafkaConnectMain extends SimpleMain {
                         if (idempotencyEnabled) {
                             switch (expressionType) {
                                 case "body":
-                                	LOG.info("idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
-                                	rd.idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+                                    LOG.info("idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
+                                    rd.idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
                                     break;
                                 case "header":
-                                	LOG.info("idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
+                                    LOG.info("idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
                                     rd.idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
                                     break;
                                 default:
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 9472206..96e7eb6 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -32,7 +32,6 @@ import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.junit.jupiter.api.Test;
 
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index ea7bb1b..c6fa7eb 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -441,19 +441,16 @@ public class CamelSourceTaskTest {
     public void testSourcePollingWithIdempotencyEnabledAndBody() {
 
         CamelSourceTask sourceTask = new CamelSourceTask();
-        sourceTask.start(mapOf(
-            CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
-            CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
-            CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true,
-            CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body"
-        ));
+        sourceTask.start(mapOf(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME, CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
+                               CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true,
+                               CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body"));
 
         try {
 
-        	sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test");
-        	sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test1");
-        	sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test");
-        	sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test2");
+            sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test");
+            sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test1");
+            sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test");
+            sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test2");
 
             List<SourceRecord> records = sourceTask.poll();
 
@@ -465,25 +462,22 @@ public class CamelSourceTaskTest {
             sourceTask.stop();
         }
     }
-    
+
     @Test
     public void testSourcePollingWithIdempotencyEnabledAndHeader() {
 
         CamelSourceTask sourceTask = new CamelSourceTask();
-        sourceTask.start(mapOf(
-            CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
-            CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
-            CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true,
-            CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "header",
-            CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, "headerIdempotency"
-        ));
+        sourceTask
+            .start(mapOf(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME, CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
+                         CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true, CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF,
+                         "header", CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, "headerIdempotency"));
 
         try {
 
-        	sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test", "headerIdempotency", "Test");
-        	sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test1", "headerIdempotency", "Test1");
-        	sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "TestTest", "headerIdempotency", "Test");
-        	sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test2", "headerIdempotency", "Test2");
+            sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test", "headerIdempotency", "Test");
+            sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test1", "headerIdempotency", "Test1");
+            sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "TestTest", "headerIdempotency", "Test");
+            sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test2", "headerIdempotency", "Test2");
 
             List<SourceRecord> records = sourceTask.poll();
 
@@ -495,26 +489,21 @@ public class CamelSourceTaskTest {
             sourceTask.stop();
         }
     }
-    
+
     @Test
     public void testSourcePollingWithAggregationAndIdempotencyBySizeAndTimeout() {
         final int chunkSize = 2;
 
         CamelSourceTask sourceTask = new CamelSourceTask();
-        sourceTask.start(mapOf(
-            CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
-            CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
-            CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true,
-            CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body",
-            CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.StringJoinerAggregator",
-            CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF + ".delimiter", "|",
-            CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, chunkSize
-        ));
+        sourceTask
+            .start(mapOf(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME, CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
+                         CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true, CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF,
+                         "body", CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.StringJoinerAggregator",
+                         CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF + ".delimiter", "|", CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, chunkSize));
 
         try {
             assertThat(sourceTask.getCms().getCamelContext().getRegistry().lookupByName(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME))
-                .isInstanceOf(StringJoinerAggregator.class)
-                .hasFieldOrPropertyWithValue("delimiter", "|");
+                .isInstanceOf(StringJoinerAggregator.class).hasFieldOrPropertyWithValue("delimiter", "|");
 
             sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 0);
             sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 1);
@@ -524,7 +513,7 @@ public class CamelSourceTaskTest {
             sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 1);
             sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 3);
             sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 2);
-            
+
             List<SourceRecord> records = sourceTask.poll();
 
             assertThat(records).hasSize(3);