You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2022/10/17 09:16:35 UTC
[camel-kafka-connector] 01/03: fix #1447 Fixed NPE error during SinkTask header mapping
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 2a36be113d76a6a306a4934c2e7406b0d75f96df
Author: Jakub Malek <ja...@gmail.com>
AuthorDate: Wed Oct 12 12:32:36 2022 +0200
fix #1447 Fixed NPE error during SinkTask header mapping
---
.../apache/camel/kafkaconnector/CamelSinkTask.java | 3 +-
.../camel/kafkaconnector/CamelSinkTaskTest.java | 188 ++++++++-------------
2 files changed, 77 insertions(+), 114 deletions(-)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index a53f298c5..b66bce262 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -240,7 +240,8 @@ public class CamelSinkTask extends SinkTask {
final String key = StringHelper.after(header.key(), prefix, header.key());
final Schema schema = header.schema();
- if (schema.type().equals(Schema.BYTES_SCHEMA.type())
+ if (schema != null
+ && schema.type().equals(Schema.BYTES_SCHEMA.type())
&& Objects.equals(schema.name(), Decimal.LOGICAL_NAME)
&& header.value() instanceof byte[]) {
destination.put(key, Decimal.toLogical(schema, (byte[]) header.value()));
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index c08a4f74c..598d7a3d2 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.kafkaconnector;
import java.math.BigDecimal;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -33,7 +34,10 @@ import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.SimpleHeaderConverter;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
@@ -43,20 +47,32 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class CamelSinkTaskTest {
+class CamelSinkTaskTest {
private static final String SEDA_URI = "seda:test";
private static final String TOPIC_NAME = "my-topic";
private static final long RECEIVE_TIMEOUT = 1_000;
private static final String TOPIC_CONF = "topics";
+ private CamelSinkTask sinkTask;
+
+ @BeforeEach
+ void setup() {
+ sinkTask = new CamelSinkTask();
+ }
+
+ @AfterEach
+ void tearDown() {
+ sinkTask.stop();
+ }
+
+
@Test
- public void testOnlyBody() {
+ void testOnlyBody() {
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
- CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);
List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -70,17 +86,14 @@ public class CamelSinkTaskTest {
assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-
- sinkTask.stop();
}
@Test
- public void testTopicsRegex() {
+ void testTopicsRegex() {
Map<String, String> props = new HashMap<>();
props.put("topics.regex", "topic1*");
props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
- CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);
List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -99,17 +112,14 @@ public class CamelSinkTaskTest {
Exchange exchange1 = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
assertEquals("cameltopicregex", exchange1.getMessage().getBody());
assertEquals("test", exchange1.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
-
- sinkTask.stop();
}
@Test
- public void testBodyAndHeaders() {
+ void testBodyAndHeaders() {
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
- CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);
Byte myByte = new Byte("100");
@@ -155,18 +165,15 @@ public class CamelSinkTaskTest {
assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
assertEquals(myBigDecimal, exchange.getIn().getHeader("MyBigDecimal", BigDecimal.class));
assertEquals(kafkaBigDecimal, exchange.getIn().getHeader("KafkaBigDecimal", BigDecimal.class));
-
- sinkTask.stop();
}
@Test
- public void testBodyAndHeadersExclusions() {
+ void testBodyAndHeadersExclusions() {
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, "MyBoolean" + "|" + "MyShort");
- CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);
Byte myByte = new Byte("100");
@@ -203,18 +210,15 @@ public class CamelSinkTaskTest {
assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
assertEquals(myBigDecimal, exchange.getIn().getHeader("MyBigDecimal", BigDecimal.class));
-
- sinkTask.stop();
}
@Test
- public void testBodyAndHeadersExclusionsRegex() {
+ void testBodyAndHeadersExclusionsRegex() {
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, "My*");
- CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);
Byte myByte = new Byte("100");
@@ -251,17 +255,14 @@ public class CamelSinkTaskTest {
assertNull(exchange.getIn().getHeader("MyInteger"));
assertNull(exchange.getIn().getHeader("MyLong", Long.class));
assertNull(exchange.getIn().getHeader("MyBigDecimal", BigDecimal.class));
-
- sinkTask.stop();
}
@Test
- public void testBodyAndProperties() {
+ void testBodyAndProperties() {
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
- CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);
Byte myByte = new Byte("100");
@@ -294,17 +295,14 @@ public class CamelSinkTaskTest {
assertEquals(myDouble, (Double) exchange.getProperties().get("MyDouble"));
assertEquals(myInteger, exchange.getProperties().get("MyInteger"));
assertEquals(myLong, (Long) exchange.getProperties().get("MyLong"));
-
- sinkTask.stop();
}
@Test
- public void testBodyAndPropertiesHeadersMixed() {
+ void testBodyAndPropertiesHeadersMixed() {
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
- CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);
Byte myByte = new Byte("100");
@@ -351,17 +349,14 @@ public class CamelSinkTaskTest {
assertEquals(myDouble, exchange.getIn().getHeader("MyDouble", Double.class));
assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
-
- sinkTask.stop();
}
@Test
- public void testBodyAndHeadersMap() {
+ void testBodyAndHeadersMap() {
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
- CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);
Byte myByte = new Byte("100");
@@ -406,16 +401,14 @@ public class CamelSinkTaskTest {
assertEquals(map, exchange.getIn().getHeader("MyMap", Map.class));
assertEquals(map1, exchange.getIn().getHeader("MyMap1", Map.class));
assertEquals(map2, exchange.getIn().getHeader("MyMap2", Map.class));
- sinkTask.stop();
}
@Test
- public void testBodyAndPropertiesHeadersMapMixed() {
+ void testBodyAndPropertiesHeadersMapMixed() {
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
- CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);
Byte myByte = new Byte("100");
@@ -480,17 +473,14 @@ public class CamelSinkTaskTest {
assertEquals(map, exchange.getIn().getHeader("MyMap", Map.class));
assertEquals(map1, exchange.getIn().getHeader("MyMap1", Map.class));
assertEquals(map2, exchange.getIn().getHeader("MyMap2", Map.class));
-
- sinkTask.stop();
}
@Test
- public void testBodyAndHeadersList() {
+ void testBodyAndHeadersList() {
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
- CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);
Byte myByte = new Byte("100");
@@ -531,16 +521,14 @@ public class CamelSinkTaskTest {
assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
assertEquals(list, exchange.getIn().getHeader("MyList", List.class));
assertEquals(list1, exchange.getIn().getHeader("MyList1", List.class));
- sinkTask.stop();
}
@Test
- public void testBodyAndPropertiesHeadersListMixed() {
+ void testBodyAndPropertiesHeadersListMixed() {
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
- CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);
Byte myByte = new Byte("100");
@@ -599,12 +587,10 @@ public class CamelSinkTaskTest {
assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
assertEquals(list, exchange.getIn().getHeader("MyList", List.class));
assertEquals(list1, exchange.getIn().getHeader("MyList1", List.class));
-
- sinkTask.stop();
}
@Test
- public void testUrlPrecedenceOnComponentProperty() {
+ void testUrlPrecedenceOnComponentProperty() {
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
@@ -612,7 +598,6 @@ public class CamelSinkTaskTest {
props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "endpointProperty", "shouldNotBeUsed");
props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "pathChunk", "shouldNotBeUsed");
- CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);
List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -624,19 +609,16 @@ public class CamelSinkTaskTest {
Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
assertEquals("camel", exchange.getMessage().getBody());
assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
-
- sinkTask.stop();
}
@Test
- public void testOnlyBodyUsingComponentProperty() {
+ void testOnlyBodyUsingComponentProperty() {
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda");
props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "bridgeErrorHandler", "true");
props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", "test");
- CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);
List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -650,12 +632,10 @@ public class CamelSinkTaskTest {
assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
assertEquals(1, sinkTask.getCms().getCamelContext().getEndpoints()
.stream().filter(e -> e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true")).count());
-
- sinkTask.stop();
}
@Test
- public void testOnlyBodyUsingMultipleComponentProperties() {
+ void testOnlyBodyUsingMultipleComponentProperties() {
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda");
@@ -663,7 +643,6 @@ public class CamelSinkTaskTest {
props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "size", "50");
props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", "test");
- CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);
List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -678,19 +657,16 @@ public class CamelSinkTaskTest {
assertEquals(1, sinkTask.getCms().getCamelContext().getEndpoints()
.stream().filter(e -> e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true&size=50")).count());
-
- sinkTask.stop();
}
@Test
- public void testBodyAndPropertiesHeadersMixedWithoutPropertiesAndHeadersMapping() {
+ void testBodyAndPropertiesHeadersMixedWithoutPropertiesAndHeadersMapping() {
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, "false");
props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_HEADERS_CONF, "false");
- CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);
Byte myByte = new Byte("100");
@@ -737,18 +713,15 @@ public class CamelSinkTaskTest {
assertFalse(exchange.getMessage().getHeaders().containsKey("MyDouble"));
assertFalse(exchange.getMessage().getHeaders().containsKey("MyInteger"));
assertFalse(exchange.getMessage().getHeaders().containsKey("MyLong"));
-
- sinkTask.stop();
}
@Test
- public void testBodyAndPropertiesHeadersMixedWithoutPropertiesMapping() {
+ void testBodyAndPropertiesHeadersMixedWithoutPropertiesMapping() {
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, "false");
- CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);
Byte myByte = new Byte("100");
@@ -795,19 +768,16 @@ public class CamelSinkTaskTest {
assertEquals(myDouble, exchange.getIn().getHeader("MyDouble", Double.class));
assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
-
- sinkTask.stop();
}
@Test
- public void testIfExchangeFailsShouldThrowConnectException() {
+ void testIfExchangeFailsShouldThrowConnectException() {
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
// we use a dummy component sink in order fail the exchange delivery
props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "direct");
props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", "test");
- CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);
List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -815,18 +785,15 @@ public class CamelSinkTaskTest {
records.add(record);
assertThrows(ConnectException.class, () -> sinkTask.put(records));
-
- sinkTask.stop();
}
@Test
- public void testAggregationBody() {
+ void testAggregationBody() {
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator");
props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, "5");
- CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);
List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -850,19 +817,16 @@ public class CamelSinkTaskTest {
assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-
- sinkTask.stop();
}
@Test
- public void testAggregationBodyAndTimeout() throws InterruptedException {
+ void testAggregationBodyAndTimeout() throws InterruptedException {
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator");
props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, "5");
props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, "100");
- CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);
List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -886,12 +850,10 @@ public class CamelSinkTaskTest {
assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-
- sinkTask.stop();
}
@Test
- public void testAggregationWithIdempotencyBodyAndTimeout() throws InterruptedException {
+ void testAggregationWithIdempotencyBodyAndTimeout() throws InterruptedException {
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
@@ -900,7 +862,6 @@ public class CamelSinkTaskTest {
props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, "100");
props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, "true");
props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body");
- CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);
List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -949,18 +910,15 @@ public class CamelSinkTaskTest {
assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-
- sinkTask.stop();
}
@Test
- public void testWithIdempotency() throws InterruptedException {
+ void testWithIdempotency() throws InterruptedException {
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, "true");
props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body");
- CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);
List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -1015,19 +973,16 @@ public class CamelSinkTaskTest {
assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-
- sinkTask.stop();
}
@Test
- public void testWithIdempotencyAndHeader() throws InterruptedException {
+ void testWithIdempotencyAndHeader() throws InterruptedException {
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, "true");
props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "header");
props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, "headerIdempotency");
- CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);
List<SinkRecord> records = new ArrayList<SinkRecord>();
@@ -1056,12 +1011,10 @@ public class CamelSinkTaskTest {
assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-
- sinkTask.stop();
}
@Test
- public void testSecretRaw() {
+ void testSecretRaw() {
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
props.put("camel.sink.endpoint.secretKey", "se+ret");
@@ -1069,14 +1022,11 @@ public class CamelSinkTaskTest {
props.put("camel.sink.endpoint.queueNameOrArn", "test");
props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "aws2-sqs");
- CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);
-
- sinkTask.stop();
}
@Test
- public void testSecretRawReference() {
+ void testSecretRawReference() {
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
props.put("camel.sink.endpoint.secretKey", "#bean:mySecretKey");
@@ -1085,53 +1035,65 @@ public class CamelSinkTaskTest {
props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "aws2-sqs");
props.put("myAccessKey", "MoreSe+ret$");
- CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);
-
- sinkTask.stop();
}
@Test
- public void testBodyAndDateHeader() {
+ void testBodyAndDateHeader() {
final Date now = new Date();
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
- CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);
- try {
- List<SinkRecord> records = new ArrayList<>();
+ List<SinkRecord> records = new ArrayList<>();
- SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
- record.headers().addTimestamp(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyDate", now);
- records.add(record);
+ SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+ record.headers().addTimestamp(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyDate", now);
+ records.add(record);
- sinkTask.put(records);
+ sinkTask.put(records);
- Exchange exchange = sinkTask.getCms().getConsumerTemplate().receive(SEDA_URI, RECEIVE_TIMEOUT);
+ Exchange exchange = sinkTask.getCms().getConsumerTemplate().receive(SEDA_URI, RECEIVE_TIMEOUT);
- assertThat(exchange.getIn().getHeader("MyDate")).isInstanceOfSatisfying(Date.class, value -> {
- assertThat(value).isEqualTo(now);
- });
- } finally {
- sinkTask.stop();
- }
+ assertThat(exchange.getIn().getHeader("MyDate")).isInstanceOfSatisfying(Date.class, value -> {
+ assertThat(value).isEqualTo(now);
+ });
}
@Test
- public void testContentLogLevelConfiguration() {
+ void testContentLogLevelConfiguration() {
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, "INFO");
- CamelSinkTask sinkTask = new CamelSinkTask();
sinkTask.start(props);
assertEquals(LoggingLevel.INFO, sinkTask.getLoggingLevel());
+ }
- sinkTask.stop();
+ @Test
+ void testThatSchemalessHeaderIsBeingMappedToExchange() {
+ // given sink task
+ Map<String, String> properties = new HashMap<>();
+ properties.put(TOPIC_CONF, TOPIC_NAME);
+ properties.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+ sinkTask.start(properties);
+
+ // and source record
+ String headerName = "test-header";
+ Long headerValue = 1234L;
+ SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 0);
+ record.headers().add(CamelSinkTask.HEADER_CAMEL_PREFIX + headerName, headerValue, null);
+
+ // when
+ sinkTask.put(Collections.singleton(record));
+
+ // then
+ ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
+ Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+ assertThat(exchange.getIn().getHeader(headerName)).isEqualTo(headerValue);
}
}