You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/01/29 07:56:54 UTC
[camel-kafka-connector] 01/01: Add a map Camel Headers to Kafka
headers option to make the behavior configurable
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch 923
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit fb7248579af1e62c8510693bbfa1041c4ece5961
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Fri Jan 29 08:56:29 2021 +0100
Add a map Camel Headers to Kafka headers option to make the behavior configurable
---
.../camel/kafkaconnector/CamelConnectorConfig.java | 4 ++
.../kafkaconnector/CamelSinkConnectorConfig.java | 3 +-
.../apache/camel/kafkaconnector/CamelSinkTask.java | 6 ++-
.../kafkaconnector/CamelSourceConnectorConfig.java | 3 +-
.../camel/kafkaconnector/CamelSourceTask.java | 8 ++-
.../camel/kafkaconnector/CamelSinkTaskTest.java | 59 ++++++++++++++++++++++
.../camel/kafkaconnector/CamelSourceTaskTest.java | 25 +++++++++
7 files changed, 103 insertions(+), 5 deletions(-)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
index 196b872..11598a3 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
@@ -47,6 +47,10 @@ public abstract class CamelConnectorConfig extends AbstractConfig {
public static final String CAMEL_CONNECTOR_MAP_PROPERTIES_CONF = "camel.map.properties";
public static final String CAMEL_CONNECTOR_MAP_PROPERTIES_DOC = "If set to true, the connector will transform the exchange properties into kafka headers.";
+ public static final Boolean CAMEL_CONNECTOR_MAP_HEADERS_DEFAULT = true;
+ public static final String CAMEL_CONNECTOR_MAP_HEADERS_CONF = "camel.map.headers";
+ public static final String CAMEL_CONNECTOR_MAP_HEADERS_DOC = "If set to true, the connector will transform the camel exchange headers into kafka headers.";
+
public static final int CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DEFAULT = 0;
public static final String CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF = "camel.error.handler.max.redeliveries";
public static final String CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DOC = "The maximum redeliveries to be use in case of Default Error Handler";
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index f40749d..1ef5a86 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -68,7 +68,8 @@ public class CamelSinkConnectorConfig extends CamelConnectorConfig {
.define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DOC)
.define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DOC)
.define(CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, Type.STRING, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DOC)
- .define(CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_MAP_PROPERTIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_MAP_PROPERTIES_DOC);
+ .define(CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_MAP_PROPERTIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_MAP_PROPERTIES_DOC)
+ .define(CAMEL_CONNECTOR_MAP_HEADERS_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_MAP_HEADERS_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_MAP_HEADERS_DOC);
public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index c94ad15..5b97806 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -60,6 +60,7 @@ public class CamelSinkTask extends SinkTask {
private Endpoint localEndpoint;
private LoggingLevel loggingLevel = LoggingLevel.OFF;
private boolean mapProperties;
+ private boolean mapHeaders;
@Override
public String version() {
@@ -103,6 +104,7 @@ public class CamelSinkTask extends SinkTask {
final int idempotentRepositoryKafkaPollDuration = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF);
final String headersRemovePattern = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF);
mapProperties = config.getBoolean(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF);
+ mapHeaders = config.getBoolean(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_HEADERS_CONF);
CamelContext camelContext = new DefaultCamelContext();
if (remoteUrl == null) {
@@ -173,7 +175,9 @@ public class CamelSinkTask extends SinkTask {
for (Header header : record.headers()) {
if (header.key().startsWith(HEADER_CAMEL_PREFIX)) {
- mapHeader(header, HEADER_CAMEL_PREFIX, exchange.getMessage().getHeaders());
+ if (mapHeaders) {
+ mapHeader(header, HEADER_CAMEL_PREFIX, exchange.getMessage().getHeaders());
+ }
} else if (header.key().startsWith(PROPERTY_CAMEL_PREFIX)) {
if (mapProperties) {
mapHeader(header, PROPERTY_CAMEL_PREFIX, exchange.getProperties());
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index 6384961..bb4f8f8 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -104,7 +104,8 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig {
.define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DOC)
.define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DOC)
.define(CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, Type.STRING, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DOC)
- .define(CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_MAP_PROPERTIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_MAP_PROPERTIES_DOC);;
+ .define(CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_MAP_PROPERTIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_MAP_PROPERTIES_DOC)
+ .define(CAMEL_CONNECTOR_MAP_HEADERS_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_MAP_HEADERS_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_MAP_HEADERS_DOC);
public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index f45b78e..541626f 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -59,6 +59,7 @@ public class CamelSourceTask extends SourceTask {
private String camelMessageHeaderKey;
private LoggingLevel loggingLevel = LoggingLevel.OFF;
private boolean mapProperties;
+ private boolean mapHeaders;
@Override
public String version() {
@@ -103,6 +104,7 @@ public class CamelSourceTask extends SourceTask {
final int idempotentRepositoryKafkaPollDuration = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF);
final String headersRemovePattern = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF);
mapProperties = config.getBoolean(CamelSourceConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF);
+ mapHeaders = config.getBoolean(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_HEADERS_CONF);
topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(",");
@@ -187,8 +189,10 @@ public class CamelSourceTask extends SourceTask {
SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, singleTopic, null, messageKeySchema,
messageHeaderKey, messageBodySchema, messageBodyValue, timestamp);
- if (exchange.getMessage().hasHeaders()) {
- setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
+ if (mapHeaders) {
+ if (exchange.getMessage().hasHeaders()) {
+ setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
+ }
}
if (mapProperties) {
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 93661a7..5aaca7f 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -673,6 +673,65 @@ public class CamelSinkTaskTest {
}
@Test
+ public void testBodyAndPropertiesHeadersMixedWithoutPropertiesAndHeadersMapping() {
+ Map<String, String> props = new HashMap<>();
+ props.put(TOPIC_CONF, TOPIC_NAME);
+ props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+ props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, "false");
+ props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_HEADERS_CONF, "false");
+
+ CamelSinkTask sinkTask = new CamelSinkTask();
+ sinkTask.start(props);
+
+ Byte myByte = new Byte("100");
+ Float myFloat = new Float("100");
+ Short myShort = new Short("100");
+ Double myDouble = new Double("100");
+ int myInteger = 100;
+ Long myLong = new Long("100");
+
+ List<SinkRecord> records = new ArrayList<SinkRecord>();
+ SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+ record.headers().addBoolean(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyBoolean", true);
+ record.headers().addByte(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyByte", myByte);
+ record.headers().addFloat(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyFloat", myFloat);
+ record.headers().addShort(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyShort", myShort);
+ record.headers().addDouble(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyDouble", myDouble);
+ record.headers().addInt(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyInteger", myInteger);
+ record.headers().addLong(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyLong", myLong);
+ record.headers().addBoolean(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyBoolean", true);
+ record.headers().addByte(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyByte", myByte);
+ record.headers().addFloat(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyFloat", myFloat);
+ record.headers().addShort(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyShort", myShort);
+ record.headers().addDouble(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyDouble", myDouble);
+ record.headers().addInt(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyInteger", myInteger);
+ record.headers().addLong(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyLong", myLong);
+ records.add(record);
+ sinkTask.put(records);
+
+ ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
+ Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+ assertEquals("camel", exchange.getMessage().getBody());
+ assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+ assertFalse(exchange.getProperties().containsKey("MyBoolean"));
+ assertFalse(exchange.getProperties().containsKey("MyByte"));
+ assertFalse(exchange.getProperties().containsKey("MyFloat"));
+ assertFalse(exchange.getProperties().containsKey("MyShort"));
+ assertFalse(exchange.getProperties().containsKey("MyDouble"));
+ assertFalse(exchange.getProperties().containsKey("MyInteger"));
+ assertFalse(exchange.getProperties().containsKey("MyLong"));
+ assertFalse(exchange.getMessage().getHeaders().containsKey("MyBoolean"));
+ assertFalse(exchange.getMessage().getHeaders().containsKey("MyByte"));
+ assertFalse(exchange.getMessage().getHeaders().containsKey("MyFloat"));
+ assertFalse(exchange.getMessage().getHeaders().containsKey("MyShort"));
+ assertFalse(exchange.getMessage().getHeaders().containsKey("MyDouble"));
+ assertFalse(exchange.getMessage().getHeaders().containsKey("MyInteger"));
+ assertFalse(exchange.getMessage().getHeaders().containsKey("MyLong"));
+
+ sinkTask.stop();
+ }
+
+ @Test
public void testBodyAndPropertiesHeadersMixedWithoutPropertiesMapping() {
Map<String, String> props = new HashMap<>();
props.put(TOPIC_CONF, TOPIC_NAME);
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index f90876e..b2a7c4e 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -355,6 +355,31 @@ public class CamelSourceTaskTest {
sourceTask.stop();
}
}
+
+ @Test
+ public void testSourceByteArrayHeaderMapping() {
+ CamelSourceTask sourceTask = new CamelSourceTask();
+ sourceTask.start(mapOf(
+ CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
+ CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "direct",
+ CamelSourceConnectorConfig.CAMEL_CONNECTOR_MAP_HEADERS_CONF, "false",
+ CamelSourceConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, "false",
+ CamelSourceTask.getCamelSourcePathConfigPrefix() + "name", "start"
+ ));
+
+ sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "test", "byteArray", new Byte[] {
+ 1, 2
+ });
+
+ try {
+ List<SourceRecord> results = sourceTask.poll();
+ assertThat(results).hasSize(1);
+
+ assertEquals(0, results.get(0).headers().size());
+ } finally {
+ sourceTask.stop();
+ }
+ }
@Test
public void testSourceDateHeader() {