You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2022/10/18 11:08:48 UTC

[camel-kafka-connector] branch camel-kafka-connector-3.18.x updated (af2c7e28d -> 9705e8f3c)

This is an automated email from the ASF dual-hosted git repository.

valdar pushed a change to branch camel-kafka-connector-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


    from af2c7e28d Regen
     new c07d4380c fix #1447 Fixed NPE error during SinkTask header mapping
     new 9705e8f3c fix #1447 Unused import removal

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/camel/kafkaconnector/CamelSinkTask.java |   3 +-
 .../camel/kafkaconnector/CamelSinkTaskTest.java    | 187 ++++++++-------------
 2 files changed, 76 insertions(+), 114 deletions(-)


[camel-kafka-connector] 01/02: fix #1447 Fixed NPE error during SinkTask header mapping

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch camel-kafka-connector-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit c07d4380ca9c684e6c44cecbffaa99acaab8f377
Author: Jakub Malek <ja...@gmail.com>
AuthorDate: Wed Oct 12 12:32:36 2022 +0200

    fix #1447 Fixed NPE error during SinkTask header mapping
---
 .../apache/camel/kafkaconnector/CamelSinkTask.java |   3 +-
 .../camel/kafkaconnector/CamelSinkTaskTest.java    | 188 ++++++++-------------
 2 files changed, 77 insertions(+), 114 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index a53f298c5..b66bce262 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -240,7 +240,8 @@ public class CamelSinkTask extends SinkTask {
         final String key = StringHelper.after(header.key(), prefix, header.key());
         final Schema schema = header.schema();
 
-        if (schema.type().equals(Schema.BYTES_SCHEMA.type())
+        if (schema != null
+                && schema.type().equals(Schema.BYTES_SCHEMA.type())
                 && Objects.equals(schema.name(), Decimal.LOGICAL_NAME)
                 && header.value() instanceof byte[]) {
             destination.put(key, Decimal.toLogical(schema, (byte[]) header.value()));
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index c08a4f74c..598d7a3d2 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.kafkaconnector;
 
 import java.math.BigDecimal;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -33,7 +34,10 @@ import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.storage.SimpleHeaderConverter;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -43,20 +47,32 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class CamelSinkTaskTest {
+class CamelSinkTaskTest {
 
     private static final String SEDA_URI = "seda:test";
     private static final String TOPIC_NAME = "my-topic";
     private static final long RECEIVE_TIMEOUT = 1_000;
     private static final String TOPIC_CONF = "topics";
 
+    private CamelSinkTask sinkTask;
+
+    @BeforeEach
+    void setup() {
+        sinkTask = new CamelSinkTask();
+    }
+
+    @AfterEach
+    void tearDown() {
+        sinkTask.stop();
+    }
+
+
     @Test
-    public void testOnlyBody() {
+    void testOnlyBody() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -70,17 +86,14 @@ public class CamelSinkTaskTest {
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
             .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testTopicsRegex() {
+    void testTopicsRegex() {
         Map<String, String> props = new HashMap<>();
         props.put("topics.regex", "topic1*");
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -99,17 +112,14 @@ public class CamelSinkTaskTest {
         Exchange exchange1 = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("cameltopicregex", exchange1.getMessage().getBody());
         assertEquals("test", exchange1.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testBodyAndHeaders() {
+    void testBodyAndHeaders() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -155,18 +165,15 @@ public class CamelSinkTaskTest {
         assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
         assertEquals(myBigDecimal, exchange.getIn().getHeader("MyBigDecimal", BigDecimal.class));
         assertEquals(kafkaBigDecimal, exchange.getIn().getHeader("KafkaBigDecimal", BigDecimal.class));
-
-        sinkTask.stop();
     }
     
     @Test
-    public void testBodyAndHeadersExclusions() {
+    void testBodyAndHeadersExclusions() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, "MyBoolean" + "|" + "MyShort");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -203,18 +210,15 @@ public class CamelSinkTaskTest {
         assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
         assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
         assertEquals(myBigDecimal, exchange.getIn().getHeader("MyBigDecimal", BigDecimal.class));
-
-        sinkTask.stop();
     }
     
     @Test
-    public void testBodyAndHeadersExclusionsRegex() {
+    void testBodyAndHeadersExclusionsRegex() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, "My*");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -251,17 +255,14 @@ public class CamelSinkTaskTest {
         assertNull(exchange.getIn().getHeader("MyInteger"));
         assertNull(exchange.getIn().getHeader("MyLong", Long.class));
         assertNull(exchange.getIn().getHeader("MyBigDecimal", BigDecimal.class));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testBodyAndProperties() {
+    void testBodyAndProperties() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -294,17 +295,14 @@ public class CamelSinkTaskTest {
         assertEquals(myDouble, (Double) exchange.getProperties().get("MyDouble"));
         assertEquals(myInteger, exchange.getProperties().get("MyInteger"));
         assertEquals(myLong, (Long) exchange.getProperties().get("MyLong"));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testBodyAndPropertiesHeadersMixed() {
+    void testBodyAndPropertiesHeadersMixed() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -351,17 +349,14 @@ public class CamelSinkTaskTest {
         assertEquals(myDouble, exchange.getIn().getHeader("MyDouble", Double.class));
         assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
         assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testBodyAndHeadersMap() {
+    void testBodyAndHeadersMap() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -406,16 +401,14 @@ public class CamelSinkTaskTest {
         assertEquals(map, exchange.getIn().getHeader("MyMap", Map.class));
         assertEquals(map1, exchange.getIn().getHeader("MyMap1", Map.class));
         assertEquals(map2, exchange.getIn().getHeader("MyMap2", Map.class));
-        sinkTask.stop();
     }
 
     @Test
-    public void testBodyAndPropertiesHeadersMapMixed() {
+    void testBodyAndPropertiesHeadersMapMixed() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -480,17 +473,14 @@ public class CamelSinkTaskTest {
         assertEquals(map, exchange.getIn().getHeader("MyMap", Map.class));
         assertEquals(map1, exchange.getIn().getHeader("MyMap1", Map.class));
         assertEquals(map2, exchange.getIn().getHeader("MyMap2", Map.class));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testBodyAndHeadersList() {
+    void testBodyAndHeadersList() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -531,16 +521,14 @@ public class CamelSinkTaskTest {
         assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
         assertEquals(list, exchange.getIn().getHeader("MyList", List.class));
         assertEquals(list1, exchange.getIn().getHeader("MyList1", List.class));
-        sinkTask.stop();
     }
 
     @Test
-    public void testBodyAndPropertiesHeadersListMixed() {
+    void testBodyAndPropertiesHeadersListMixed() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -599,12 +587,10 @@ public class CamelSinkTaskTest {
         assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
         assertEquals(list, exchange.getIn().getHeader("MyList", List.class));
         assertEquals(list1, exchange.getIn().getHeader("MyList1", List.class));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testUrlPrecedenceOnComponentProperty() {
+    void testUrlPrecedenceOnComponentProperty() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
@@ -612,7 +598,6 @@ public class CamelSinkTaskTest {
         props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "endpointProperty", "shouldNotBeUsed");
         props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "pathChunk", "shouldNotBeUsed");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -624,19 +609,16 @@ public class CamelSinkTaskTest {
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testOnlyBodyUsingComponentProperty() {
+    void testOnlyBodyUsingComponentProperty() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda");
         props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "bridgeErrorHandler", "true");
         props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", "test");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -650,12 +632,10 @@ public class CamelSinkTaskTest {
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(1, sinkTask.getCms().getCamelContext().getEndpoints()
             .stream().filter(e -> e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true")).count());
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testOnlyBodyUsingMultipleComponentProperties() {
+    void testOnlyBodyUsingMultipleComponentProperties() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda");
@@ -663,7 +643,6 @@ public class CamelSinkTaskTest {
         props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "size", "50");
         props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", "test");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -678,19 +657,16 @@ public class CamelSinkTaskTest {
 
         assertEquals(1, sinkTask.getCms().getCamelContext().getEndpoints()
             .stream().filter(e -> e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true&size=50")).count());
-
-        sinkTask.stop();
     }
     
     @Test
-    public void testBodyAndPropertiesHeadersMixedWithoutPropertiesAndHeadersMapping() {
+    void testBodyAndPropertiesHeadersMixedWithoutPropertiesAndHeadersMapping() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, "false");
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_HEADERS_CONF, "false");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -737,18 +713,15 @@ public class CamelSinkTaskTest {
         assertFalse(exchange.getMessage().getHeaders().containsKey("MyDouble"));
         assertFalse(exchange.getMessage().getHeaders().containsKey("MyInteger"));
         assertFalse(exchange.getMessage().getHeaders().containsKey("MyLong"));
-
-        sinkTask.stop();
     }
     
     @Test
-    public void testBodyAndPropertiesHeadersMixedWithoutPropertiesMapping() {
+    void testBodyAndPropertiesHeadersMixedWithoutPropertiesMapping() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, "false");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -795,19 +768,16 @@ public class CamelSinkTaskTest {
         assertEquals(myDouble, exchange.getIn().getHeader("MyDouble", Double.class));
         assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
         assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testIfExchangeFailsShouldThrowConnectException() {
+    void testIfExchangeFailsShouldThrowConnectException() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         // we use a dummy component sink in order fail the exchange delivery
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "direct");
         props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", "test");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -815,18 +785,15 @@ public class CamelSinkTaskTest {
         records.add(record);
 
         assertThrows(ConnectException.class, () -> sinkTask.put(records));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testAggregationBody() {
+    void testAggregationBody() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator");
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, "5");
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -850,19 +817,16 @@ public class CamelSinkTaskTest {
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
             .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testAggregationBodyAndTimeout() throws InterruptedException {
+    void testAggregationBodyAndTimeout() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator");
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, "5");
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, "100");
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -886,12 +850,10 @@ public class CamelSinkTaskTest {
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
             .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-
-        sinkTask.stop();
     }
     
     @Test
-    public void testAggregationWithIdempotencyBodyAndTimeout() throws InterruptedException {
+    void testAggregationWithIdempotencyBodyAndTimeout() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
@@ -900,7 +862,6 @@ public class CamelSinkTaskTest {
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, "100");
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, "true");
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body");
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -949,18 +910,15 @@ public class CamelSinkTaskTest {
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
             .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-
-        sinkTask.stop();
     }
     
     @Test
-    public void testWithIdempotency() throws InterruptedException {
+    void testWithIdempotency() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, "true");
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body");
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -1015,19 +973,16 @@ public class CamelSinkTaskTest {
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
             .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-
-        sinkTask.stop();
     }
     
     @Test
-    public void testWithIdempotencyAndHeader() throws InterruptedException {
+    void testWithIdempotencyAndHeader() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, "true");
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "header");
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, "headerIdempotency");
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -1056,12 +1011,10 @@ public class CamelSinkTaskTest {
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
             .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testSecretRaw() {
+    void testSecretRaw() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put("camel.sink.endpoint.secretKey", "se+ret");
@@ -1069,14 +1022,11 @@ public class CamelSinkTaskTest {
         props.put("camel.sink.endpoint.queueNameOrArn", "test");
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "aws2-sqs");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testSecretRawReference() {
+    void testSecretRawReference() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put("camel.sink.endpoint.secretKey", "#bean:mySecretKey");
@@ -1085,53 +1035,65 @@ public class CamelSinkTaskTest {
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "aws2-sqs");
         props.put("myAccessKey", "MoreSe+ret$");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testBodyAndDateHeader() {
+    void testBodyAndDateHeader() {
         final Date now = new Date();
 
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
-        try {
-            List<SinkRecord> records = new ArrayList<>();
+        List<SinkRecord> records = new ArrayList<>();
 
-            SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
-            record.headers().addTimestamp(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyDate", now);
-            records.add(record);
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        record.headers().addTimestamp(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyDate", now);
+        records.add(record);
 
-            sinkTask.put(records);
+        sinkTask.put(records);
 
-            Exchange exchange = sinkTask.getCms().getConsumerTemplate().receive(SEDA_URI, RECEIVE_TIMEOUT);
+        Exchange exchange = sinkTask.getCms().getConsumerTemplate().receive(SEDA_URI, RECEIVE_TIMEOUT);
 
-            assertThat(exchange.getIn().getHeader("MyDate")).isInstanceOfSatisfying(Date.class, value -> {
-                assertThat(value).isEqualTo(now);
-            });
-        } finally {
-            sinkTask.stop();
-        }
+        assertThat(exchange.getIn().getHeader("MyDate")).isInstanceOfSatisfying(Date.class, value -> {
+            assertThat(value).isEqualTo(now);
+        });
     }
 
     @Test
-    public void testContentLogLevelConfiguration() {
+    void testContentLogLevelConfiguration() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, "INFO");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
         assertEquals(LoggingLevel.INFO, sinkTask.getLoggingLevel());
+    }
 
-        sinkTask.stop();
+    @Test
+    void testThatSchemalessHeaderIsBeingMappedToExchange() {
+        // given sink task
+        Map<String, String> properties = new HashMap<>();
+        properties.put(TOPIC_CONF, TOPIC_NAME);
+        properties.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+        sinkTask.start(properties);
+
+        // and source record
+        String headerName = "test-header";
+        Long headerValue = 1234L;
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 0);
+        record.headers().add(CamelSinkTask.HEADER_CAMEL_PREFIX + headerName, headerValue, null);
+
+        // when
+        sinkTask.put(Collections.singleton(record));
+
+        // then
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+        assertThat(exchange.getIn().getHeader(headerName)).isEqualTo(headerValue);
     }
 }


[camel-kafka-connector] 02/02: fix #1447 Unused import removal

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch camel-kafka-connector-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 9705e8f3c95dd1ec5511957b1a267de629e67ce9
Author: Jakub Malek <ja...@webfleet.com>
AuthorDate: Wed Oct 12 12:41:30 2022 +0200

    fix #1447 Unused import removal
---
 .../src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java | 1 -
 1 file changed, 1 deletion(-)

diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 598d7a3d2..eafcd45e1 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -34,7 +34,6 @@ import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.sink.SinkRecord;
-import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.storage.SimpleHeaderConverter;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;