You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2022/10/18 11:08:49 UTC

[camel-kafka-connector] 01/02: fix #1447 Fixed NPE error during SinkTask header mapping

This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch camel-kafka-connector-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit c07d4380ca9c684e6c44cecbffaa99acaab8f377
Author: Jakub Malek <ja...@gmail.com>
AuthorDate: Wed Oct 12 12:32:36 2022 +0200

    fix #1447 Fixed NPE error during SinkTask header mapping
---
 .../apache/camel/kafkaconnector/CamelSinkTask.java |   3 +-
 .../camel/kafkaconnector/CamelSinkTaskTest.java    | 188 ++++++++-------------
 2 files changed, 77 insertions(+), 114 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index a53f298c5..b66bce262 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -240,7 +240,8 @@ public class CamelSinkTask extends SinkTask {
         final String key = StringHelper.after(header.key(), prefix, header.key());
         final Schema schema = header.schema();
 
-        if (schema.type().equals(Schema.BYTES_SCHEMA.type())
+        if (schema != null
+                && schema.type().equals(Schema.BYTES_SCHEMA.type())
                 && Objects.equals(schema.name(), Decimal.LOGICAL_NAME)
                 && header.value() instanceof byte[]) {
             destination.put(key, Decimal.toLogical(schema, (byte[]) header.value()));
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index c08a4f74c..598d7a3d2 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.kafkaconnector;
 
 import java.math.BigDecimal;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -33,7 +34,10 @@ import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.storage.SimpleHeaderConverter;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -43,20 +47,32 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class CamelSinkTaskTest {
+class CamelSinkTaskTest {
 
     private static final String SEDA_URI = "seda:test";
     private static final String TOPIC_NAME = "my-topic";
     private static final long RECEIVE_TIMEOUT = 1_000;
     private static final String TOPIC_CONF = "topics";
 
+    private CamelSinkTask sinkTask;
+
+    @BeforeEach
+    void setup() {
+        sinkTask = new CamelSinkTask();
+    }
+
+    @AfterEach
+    void tearDown() {
+        sinkTask.stop();
+    }
+
+
     @Test
-    public void testOnlyBody() {
+    void testOnlyBody() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -70,17 +86,14 @@ public class CamelSinkTaskTest {
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
             .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testTopicsRegex() {
+    void testTopicsRegex() {
         Map<String, String> props = new HashMap<>();
         props.put("topics.regex", "topic1*");
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -99,17 +112,14 @@ public class CamelSinkTaskTest {
         Exchange exchange1 = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("cameltopicregex", exchange1.getMessage().getBody());
         assertEquals("test", exchange1.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testBodyAndHeaders() {
+    void testBodyAndHeaders() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -155,18 +165,15 @@ public class CamelSinkTaskTest {
         assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
         assertEquals(myBigDecimal, exchange.getIn().getHeader("MyBigDecimal", BigDecimal.class));
         assertEquals(kafkaBigDecimal, exchange.getIn().getHeader("KafkaBigDecimal", BigDecimal.class));
-
-        sinkTask.stop();
     }
     
     @Test
-    public void testBodyAndHeadersExclusions() {
+    void testBodyAndHeadersExclusions() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, "MyBoolean" + "|" + "MyShort");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -203,18 +210,15 @@ public class CamelSinkTaskTest {
         assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
         assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
         assertEquals(myBigDecimal, exchange.getIn().getHeader("MyBigDecimal", BigDecimal.class));
-
-        sinkTask.stop();
     }
     
     @Test
-    public void testBodyAndHeadersExclusionsRegex() {
+    void testBodyAndHeadersExclusionsRegex() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, "My*");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -251,17 +255,14 @@ public class CamelSinkTaskTest {
         assertNull(exchange.getIn().getHeader("MyInteger"));
         assertNull(exchange.getIn().getHeader("MyLong", Long.class));
         assertNull(exchange.getIn().getHeader("MyBigDecimal", BigDecimal.class));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testBodyAndProperties() {
+    void testBodyAndProperties() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -294,17 +295,14 @@ public class CamelSinkTaskTest {
         assertEquals(myDouble, (Double) exchange.getProperties().get("MyDouble"));
         assertEquals(myInteger, exchange.getProperties().get("MyInteger"));
         assertEquals(myLong, (Long) exchange.getProperties().get("MyLong"));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testBodyAndPropertiesHeadersMixed() {
+    void testBodyAndPropertiesHeadersMixed() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -351,17 +349,14 @@ public class CamelSinkTaskTest {
         assertEquals(myDouble, exchange.getIn().getHeader("MyDouble", Double.class));
         assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
         assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testBodyAndHeadersMap() {
+    void testBodyAndHeadersMap() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -406,16 +401,14 @@ public class CamelSinkTaskTest {
         assertEquals(map, exchange.getIn().getHeader("MyMap", Map.class));
         assertEquals(map1, exchange.getIn().getHeader("MyMap1", Map.class));
         assertEquals(map2, exchange.getIn().getHeader("MyMap2", Map.class));
-        sinkTask.stop();
     }
 
     @Test
-    public void testBodyAndPropertiesHeadersMapMixed() {
+    void testBodyAndPropertiesHeadersMapMixed() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -480,17 +473,14 @@ public class CamelSinkTaskTest {
         assertEquals(map, exchange.getIn().getHeader("MyMap", Map.class));
         assertEquals(map1, exchange.getIn().getHeader("MyMap1", Map.class));
         assertEquals(map2, exchange.getIn().getHeader("MyMap2", Map.class));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testBodyAndHeadersList() {
+    void testBodyAndHeadersList() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -531,16 +521,14 @@ public class CamelSinkTaskTest {
         assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
         assertEquals(list, exchange.getIn().getHeader("MyList", List.class));
         assertEquals(list1, exchange.getIn().getHeader("MyList1", List.class));
-        sinkTask.stop();
     }
 
     @Test
-    public void testBodyAndPropertiesHeadersListMixed() {
+    void testBodyAndPropertiesHeadersListMixed() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -599,12 +587,10 @@ public class CamelSinkTaskTest {
         assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
         assertEquals(list, exchange.getIn().getHeader("MyList", List.class));
         assertEquals(list1, exchange.getIn().getHeader("MyList1", List.class));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testUrlPrecedenceOnComponentProperty() {
+    void testUrlPrecedenceOnComponentProperty() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
@@ -612,7 +598,6 @@ public class CamelSinkTaskTest {
         props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "endpointProperty", "shouldNotBeUsed");
         props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "pathChunk", "shouldNotBeUsed");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -624,19 +609,16 @@ public class CamelSinkTaskTest {
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testOnlyBodyUsingComponentProperty() {
+    void testOnlyBodyUsingComponentProperty() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda");
         props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "bridgeErrorHandler", "true");
         props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", "test");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -650,12 +632,10 @@ public class CamelSinkTaskTest {
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(1, sinkTask.getCms().getCamelContext().getEndpoints()
             .stream().filter(e -> e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true")).count());
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testOnlyBodyUsingMultipleComponentProperties() {
+    void testOnlyBodyUsingMultipleComponentProperties() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda");
@@ -663,7 +643,6 @@ public class CamelSinkTaskTest {
         props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "size", "50");
         props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", "test");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -678,19 +657,16 @@ public class CamelSinkTaskTest {
 
         assertEquals(1, sinkTask.getCms().getCamelContext().getEndpoints()
             .stream().filter(e -> e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true&size=50")).count());
-
-        sinkTask.stop();
     }
     
     @Test
-    public void testBodyAndPropertiesHeadersMixedWithoutPropertiesAndHeadersMapping() {
+    void testBodyAndPropertiesHeadersMixedWithoutPropertiesAndHeadersMapping() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, "false");
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_HEADERS_CONF, "false");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -737,18 +713,15 @@ public class CamelSinkTaskTest {
         assertFalse(exchange.getMessage().getHeaders().containsKey("MyDouble"));
         assertFalse(exchange.getMessage().getHeaders().containsKey("MyInteger"));
         assertFalse(exchange.getMessage().getHeaders().containsKey("MyLong"));
-
-        sinkTask.stop();
     }
     
     @Test
-    public void testBodyAndPropertiesHeadersMixedWithoutPropertiesMapping() {
+    void testBodyAndPropertiesHeadersMixedWithoutPropertiesMapping() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, "false");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         Byte myByte = new Byte("100");
@@ -795,19 +768,16 @@ public class CamelSinkTaskTest {
         assertEquals(myDouble, exchange.getIn().getHeader("MyDouble", Double.class));
         assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
         assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testIfExchangeFailsShouldThrowConnectException() {
+    void testIfExchangeFailsShouldThrowConnectException() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         // we use a dummy component sink in order fail the exchange delivery
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "direct");
         props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", "test");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -815,18 +785,15 @@ public class CamelSinkTaskTest {
         records.add(record);
 
         assertThrows(ConnectException.class, () -> sinkTask.put(records));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testAggregationBody() {
+    void testAggregationBody() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator");
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, "5");
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -850,19 +817,16 @@ public class CamelSinkTaskTest {
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
             .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testAggregationBodyAndTimeout() throws InterruptedException {
+    void testAggregationBodyAndTimeout() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator");
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, "5");
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, "100");
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -886,12 +850,10 @@ public class CamelSinkTaskTest {
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
             .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-
-        sinkTask.stop();
     }
     
     @Test
-    public void testAggregationWithIdempotencyBodyAndTimeout() throws InterruptedException {
+    void testAggregationWithIdempotencyBodyAndTimeout() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
@@ -900,7 +862,6 @@ public class CamelSinkTaskTest {
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, "100");
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, "true");
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body");
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -949,18 +910,15 @@ public class CamelSinkTaskTest {
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
             .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-
-        sinkTask.stop();
     }
     
     @Test
-    public void testWithIdempotency() throws InterruptedException {
+    void testWithIdempotency() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, "true");
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body");
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -1015,19 +973,16 @@ public class CamelSinkTaskTest {
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
             .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-
-        sinkTask.stop();
     }
     
     @Test
-    public void testWithIdempotencyAndHeader() throws InterruptedException {
+    void testWithIdempotencyAndHeader() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, "true");
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "header");
         props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, "headerIdempotency");
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -1056,12 +1011,10 @@ public class CamelSinkTaskTest {
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
             .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testSecretRaw() {
+    void testSecretRaw() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put("camel.sink.endpoint.secretKey", "se+ret");
@@ -1069,14 +1022,11 @@ public class CamelSinkTaskTest {
         props.put("camel.sink.endpoint.queueNameOrArn", "test");
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "aws2-sqs");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testSecretRawReference() {
+    void testSecretRawReference() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put("camel.sink.endpoint.secretKey", "#bean:mySecretKey");
@@ -1085,53 +1035,65 @@ public class CamelSinkTaskTest {
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "aws2-sqs");
         props.put("myAccessKey", "MoreSe+ret$");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
-
-        sinkTask.stop();
     }
 
     @Test
-    public void testBodyAndDateHeader() {
+    void testBodyAndDateHeader() {
         final Date now = new Date();
 
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
 
-        try {
-            List<SinkRecord> records = new ArrayList<>();
+        List<SinkRecord> records = new ArrayList<>();
 
-            SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
-            record.headers().addTimestamp(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyDate", now);
-            records.add(record);
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        record.headers().addTimestamp(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyDate", now);
+        records.add(record);
 
-            sinkTask.put(records);
+        sinkTask.put(records);
 
-            Exchange exchange = sinkTask.getCms().getConsumerTemplate().receive(SEDA_URI, RECEIVE_TIMEOUT);
+        Exchange exchange = sinkTask.getCms().getConsumerTemplate().receive(SEDA_URI, RECEIVE_TIMEOUT);
 
-            assertThat(exchange.getIn().getHeader("MyDate")).isInstanceOfSatisfying(Date.class, value -> {
-                assertThat(value).isEqualTo(now);
-            });
-        } finally {
-            sinkTask.stop();
-        }
+        assertThat(exchange.getIn().getHeader("MyDate")).isInstanceOfSatisfying(Date.class, value -> {
+            assertThat(value).isEqualTo(now);
+        });
     }
 
     @Test
-    public void testContentLogLevelConfiguration() {
+    void testContentLogLevelConfiguration() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, "INFO");
 
-        CamelSinkTask sinkTask = new CamelSinkTask();
         sinkTask.start(props);
         assertEquals(LoggingLevel.INFO, sinkTask.getLoggingLevel());
+    }
 
-        sinkTask.stop();
+    @Test
+    void testThatSchemalessHeaderIsBeingMappedToExchange() {
+        // given sink task
+        Map<String, String> properties = new HashMap<>();
+        properties.put(TOPIC_CONF, TOPIC_NAME);
+        properties.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+        sinkTask.start(properties);
+
+        // and source record
+        String headerName = "test-header";
+        Long headerValue = 1234L;
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 0);
+        record.headers().add(CamelSinkTask.HEADER_CAMEL_PREFIX + headerName, headerValue, null);
+
+        // when
+        sinkTask.put(Collections.singleton(record));
+
+        // then
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+        assertThat(exchange.getIn().getHeader(headerName)).isEqualTo(headerValue);
     }
 }