You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2022/10/17 09:16:34 UTC

[camel-kafka-connector] branch main updated (86d4b0c04 -> 44def3696)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


    from 86d4b0c04 Updated CHANGELOG.md
     new 2a36be113 fix #1447 Fixed NPE error during SinkTask header mapping
     new 13656b6ca fix #1447 Unused import removal
     new 44def3696 fix #1447 Camel version to 3.18.1

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/camel/kafkaconnector/CamelSinkTask.java |   3 +-
 .../camel/kafkaconnector/CamelSinkTaskTest.java    | 187 ++++++++-------------
 parent/pom.xml                                     |   2 +-
 3 files changed, 77 insertions(+), 115 deletions(-)


[camel-kafka-connector] 02/03: fix #1447 Unused import removal

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 13656b6ca5a2b0e43148278cbbccb103ece037b8
Author: Jakub Malek <ja...@webfleet.com>
AuthorDate: Wed Oct 12 12:41:30 2022 +0200

    fix #1447 Unused import removal
---
 .../src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java | 1 -
 1 file changed, 1 deletion(-)

diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 598d7a3d2..eafcd45e1 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -34,7 +34,6 @@ import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.sink.SinkRecord;
-import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.storage.SimpleHeaderConverter;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;


[camel-kafka-connector] 01/03: fix #1447 Fixed NPE error during SinkTask header mapping

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 2a36be113d76a6a306a4934c2e7406b0d75f96df
Author: Jakub Malek <ja...@gmail.com>
AuthorDate: Wed Oct 12 12:32:36 2022 +0200

    fix #1447 Fixed NPE error during SinkTask header mapping
---
 .../apache/camel/kafkaconnector/CamelSinkTask.java |   3 +-
 .../camel/kafkaconnector/CamelSinkTaskTest.java    | 188 ++++++++-------------
 2 files changed, 77 insertions(+), 114 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index a53f298c5..b66bce262 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -240,7 +240,8 @@ public class CamelSinkTask extends SinkTask {
         final String key = StringHelper.after(header.key(), prefix, header.key());
         final Schema schema = header.schema();
 
-        if (schema.type().equals(Schema.BYTES_SCHEMA.type())
+        if (schema != null
+                && schema.type().equals(Schema.BYTES_SCHEMA.type())
                 && Objects.equals(schema.name(), Decimal.LOGICAL_NAME)
                 && header.value() instanceof byte[]) {
             destination.put(key, Decimal.toLogical(schema, (byte[]) header.value()));
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index c08a4f74c..598d7a3d2 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.kafkaconnector;
 
 import java.math.BigDecimal;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -33,7 +34,10 @@ import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.storage.SimpleHeaderConverter;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -43,20 +47,32 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class CamelSinkTaskTest {
+class CamelSinkTaskTest {
 
     private static final String SEDA_URI = "seda:test";
     private static final String TOPIC_NAME = "my-topic";
     private static final long RECEIVE_TIMEOUT = 1_000;
     private static final String TOPIC_CONF = "topics";
 
+    private CamelSinkTask sinkTask;
+
+    @BeforeEach
+    void setup() {
+        sinkTask = new CamelSinkTask();
+    }
+
+    @AfterEach
+    void tearDown() {
+        sinkTask.stop();
+    }
+
+
     @Test
-    public void testOnlyBody() {
+    void testOnlyBody() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -70,17 +86,14 @@ public class CamelSinkTaskTest {
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
             .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testTopicsRegex() {
+    void testTopicsRegex() {
         Map<String, String> props = new HashMap<>();
         props.put("topics.regex", "topic1*");
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -99,17 +112,14 @@ public class CamelSinkTaskTest {
         Exchange exchange1 = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("cameltopicregex", exchange1.getMessage().getBody());
         assertEquals("test", exchange1.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testBodyAndHeaders() {
+    void testBodyAndHeaders() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -155,18 +165,15 @@ public class CamelSinkTaskTest {
         assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
         assertEquals(myBigDecimal, exchange.getIn().getHeader("MyBigDecimal", BigDecimal.class));
         assertEquals(kafkaBigDecimal, exchange.getIn().getHeader("KafkaBigDecimal", BigDecimal.class));
-
-        sinkTask.stop();
     }
     
     @Test
-    public void testBodyAndHeadersExclusions() {
+    void testBodyAndHeadersExclusions() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, "MyBoolean" + "|" + "MyShort");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -203,18 +210,15 @@ public class CamelSinkTaskTest {
         assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
         assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
         assertEquals(myBigDecimal, exchange.getIn().getHeader("MyBigDecimal", BigDecimal.class));
-
-        sinkTask.stop();
     }
     
     @Test
-    public void testBodyAndHeadersExclusionsRegex() {
+    void testBodyAndHeadersExclusionsRegex() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, "My*");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -251,17 +255,14 @@ public class CamelSinkTaskTest {
         assertNull(exchange.getIn().getHeader("MyInteger"));
         assertNull(exchange.getIn().getHeader("MyLong", Long.class));
         assertNull(exchange.getIn().getHeader("MyBigDecimal", BigDecimal.class));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testBodyAndProperties() {
+    void testBodyAndProperties() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -294,17 +295,14 @@ public class CamelSinkTaskTest {
         assertEquals(myDouble, (Double) exchange.getProperties().get("MyDouble"));
         assertEquals(myInteger, exchange.getProperties().get("MyInteger"));
         assertEquals(myLong, (Long) exchange.getProperties().get("MyLong"));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testBodyAndPropertiesHeadersMixed() {
+    void testBodyAndPropertiesHeadersMixed() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -351,17 +349,14 @@ public class CamelSinkTaskTest {
         assertEquals(myDouble, exchange.getIn().getHeader("MyDouble", Double.class));
         assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
         assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testBodyAndHeadersMap() {
+    void testBodyAndHeadersMap() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -406,16 +401,14 @@ public class CamelSinkTaskTest {
         assertEquals(map, exchange.getIn().getHeader("MyMap", Map.class));
         assertEquals(map1, exchange.getIn().getHeader("MyMap1", Map.class));
         assertEquals(map2, exchange.getIn().getHeader("MyMap2", Map.class));
-        sinkTask.stop();
     }
 
     @Test
-    public void testBodyAndPropertiesHeadersMapMixed() {
+    void testBodyAndPropertiesHeadersMapMixed() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -480,17 +473,14 @@ public class CamelSinkTaskTest {
         assertEquals(map, exchange.getIn().getHeader("MyMap", Map.class));
         assertEquals(map1, exchange.getIn().getHeader("MyMap1", Map.class));
         assertEquals(map2, exchange.getIn().getHeader("MyMap2", Map.class));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testBodyAndHeadersList() {
+    void testBodyAndHeadersList() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -531,16 +521,14 @@ public class CamelSinkTaskTest {
         assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
         assertEquals(list, exchange.getIn().getHeader("MyList", List.class));
         assertEquals(list1, exchange.getIn().getHeader("MyList1", List.class));
-        sinkTask.stop();
     }
 
     @Test
-    public void testBodyAndPropertiesHeadersListMixed() {
+    void testBodyAndPropertiesHeadersListMixed() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -599,12 +587,10 @@ public class CamelSinkTaskTest {
         assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
         assertEquals(list, exchange.getIn().getHeader("MyList", List.class));
         assertEquals(list1, exchange.getIn().getHeader("MyList1", List.class));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testUrlPrecedenceOnComponentProperty() {
+    void testUrlPrecedenceOnComponentProperty() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
@@ -612,7 +598,6 @@ public class CamelSinkTaskTest {
         props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "endpointProperty", "shouldNotBeUsed");
         props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "pathChunk", "shouldNotBeUsed");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -624,19 +609,16 @@ public class CamelSinkTaskTest {
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testOnlyBodyUsingComponentProperty() {
+    void testOnlyBodyUsingComponentProperty() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda");
         props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "bridgeErrorHandler", "true");
         props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", "test");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -650,12 +632,10 @@ public class CamelSinkTaskTest {
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(1, sinkTask.getCms().getCamelContext().getEndpoints()
             .stream().filter(e -> e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true")).count());
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testOnlyBodyUsingMultipleComponentProperties() {
+    void testOnlyBodyUsingMultipleComponentProperties() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda");
@@ -663,7 +643,6 @@ public class CamelSinkTaskTest {
         props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "size", "50");
         props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", "test");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -678,19 +657,16 @@ public class CamelSinkTaskTest {
 
         assertEquals(1, sinkTask.getCms().getCamelContext().getEndpoints()
             .stream().filter(e -> e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true&size=50")).count());
-
-        sinkTask.stop();
     }
     
     @Test
-    public void testBodyAndPropertiesHeadersMixedWithoutPropertiesAndHeadersMapping() {
+    void testBodyAndPropertiesHeadersMixedWithoutPropertiesAndHeadersMapping() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, "false");
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_HEADERS_CONF, "false");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -737,18 +713,15 @@ public class CamelSinkTaskTest {
         assertFalse(exchange.getMessage().getHeaders().containsKey("MyDouble"));
         assertFalse(exchange.getMessage().getHeaders().containsKey("MyInteger"));
         assertFalse(exchange.getMessage().getHeaders().containsKey("MyLong"));
-
-        sinkTask.stop();
     }
     
     @Test
-    public void testBodyAndPropertiesHeadersMixedWithoutPropertiesMapping() {
+    void testBodyAndPropertiesHeadersMixedWithoutPropertiesMapping() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, "false");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -795,19 +768,16 @@ public class CamelSinkTaskTest {
         assertEquals(myDouble, exchange.getIn().getHeader("MyDouble", Double.class));
         assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
         assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testIfExchangeFailsShouldThrowConnectException() {
+    void testIfExchangeFailsShouldThrowConnectException() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         // we use a dummy component sink in order fail the exchange delivery
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "direct");
         props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", "test");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -815,18 +785,15 @@ public class CamelSinkTaskTest {
         records.add(record);
 
         assertThrows(ConnectException.class, () -> sinkTask.put(records));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testAggregationBody() {
+    void testAggregationBody() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator");
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, "5");
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -850,19 +817,16 @@ public class CamelSinkTaskTest {
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
             .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testAggregationBodyAndTimeout() throws InterruptedException {
+    void testAggregationBodyAndTimeout() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator");
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, "5");
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, "100");
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -886,12 +850,10 @@ public class CamelSinkTaskTest {
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
             .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-
-        sinkTask.stop();
     }
     
     @Test
-    public void testAggregationWithIdempotencyBodyAndTimeout() throws InterruptedException {
+    void testAggregationWithIdempotencyBodyAndTimeout() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
@@ -900,7 +862,6 @@ public class CamelSinkTaskTest {
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, "100");
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, "true");
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body");
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -949,18 +910,15 @@ public class CamelSinkTaskTest {
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
             .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-
-        sinkTask.stop();
     }
     
     @Test
-    public void testWithIdempotency() throws InterruptedException {
+    void testWithIdempotency() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, "true");
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body");
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -1015,19 +973,16 @@ public class CamelSinkTaskTest {
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
             .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-
-        sinkTask.stop();
     }
     
     @Test
-    public void testWithIdempotencyAndHeader() throws InterruptedException {
+    void testWithIdempotencyAndHeader() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, "true");
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "header");
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, "headerIdempotency");
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -1056,12 +1011,10 @@ public class CamelSinkTaskTest {
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
             .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testSecretRaw() {
+    void testSecretRaw() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put("camel.sink.endpoint.secretKey", "se+ret");
@@ -1069,14 +1022,11 @@ public class CamelSinkTaskTest {
         props.put("camel.sink.endpoint.queueNameOrArn", "test");
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "aws2-sqs");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testSecretRawReference() {
+    void testSecretRawReference() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put("camel.sink.endpoint.secretKey", "#bean:mySecretKey");
@@ -1085,53 +1035,65 @@ public class CamelSinkTaskTest {
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "aws2-sqs");
         props.put("myAccessKey", "MoreSe+ret$");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testBodyAndDateHeader() {
+    void testBodyAndDateHeader() {
         final Date now = new Date();
 
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
-        try {
-            List<SinkRecord> records = new ArrayList<>();
+        List<SinkRecord> records = new ArrayList<>();
 
-            SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
-            record.headers().addTimestamp(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyDate", now);
-            records.add(record);
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        record.headers().addTimestamp(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyDate", now);
+        records.add(record);
 
-            sinkTask.put(records);
+        sinkTask.put(records);
 
-            Exchange exchange = sinkTask.getCms().getConsumerTemplate().receive(SEDA_URI, RECEIVE_TIMEOUT);
+        Exchange exchange = sinkTask.getCms().getConsumerTemplate().receive(SEDA_URI, RECEIVE_TIMEOUT);
 
-            assertThat(exchange.getIn().getHeader("MyDate")).isInstanceOfSatisfying(Date.class, value -> {
-                assertThat(value).isEqualTo(now);
-            });
-        } finally {
-            sinkTask.stop();
-        }
+        assertThat(exchange.getIn().getHeader("MyDate")).isInstanceOfSatisfying(Date.class, value -> {
+            assertThat(value).isEqualTo(now);
+        });
     }
 
     @Test
-    public void testContentLogLevelConfiguration() {
+    void testContentLogLevelConfiguration() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, "INFO");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
         assertEquals(LoggingLevel.INFO, sinkTask.getLoggingLevel());
+    }
 
-        sinkTask.stop();
+    @Test
+    void testThatSchemalessHeaderIsBeingMappedToExchange() {
+        // given sink task
+        Map<String, String> properties = new HashMap<>();
+        properties.put(TOPIC_CONF, TOPIC_NAME);
+        properties.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+        sinkTask.start(properties);
+
+        // and source record
+        String headerName = "test-header";
+        Long headerValue = 1234L;
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 0);
+        record.headers().add(CamelSinkTask.HEADER_CAMEL_PREFIX + headerName, headerValue, null);
+
+        // when
+        sinkTask.put(Collections.singleton(record));
+
+        // then
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+        assertThat(exchange.getIn().getHeader(headerName)).isEqualTo(headerValue);
     }
 }


[camel-kafka-connector] 03/03: fix #1447 Camel version to 3.18.1

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 44def369610831a91c05770a2fd4a5b4ae6e7e91
Author: Jakub Malek <ja...@gmail.com>
AuthorDate: Fri Oct 14 16:01:14 2022 +0200

    fix #1447 Camel version to 3.18.1
---
 parent/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/parent/pom.xml b/parent/pom.xml
index 33de26e6f..de91d4bab 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -28,7 +28,7 @@
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 
         <kafka.version>2.8.0</kafka.version>
-        <camel.version>3.19.0-SNAPSHOT</camel.version>
+        <camel.version>3.18.1</camel.version>
         <camel.kamelet.catalog.version>0.9.0</camel.kamelet.catalog.version>
         <apicurio.registry.version>1.3.2.Final</apicurio.registry.version>
         <resteasy.version>4.5.6.Final</resteasy.version>