You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/01/29 07:56:53 UTC

[camel-kafka-connector] branch 923 created (now fb72485)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch 923
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


      at fb72485  Add a map Camel Headers to Kafka headers option to make the behavior configurable

This branch includes the following new commits:

     new fb72485  Add a map Camel Headers to Kafka headers option to make the behavior configurable

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel-kafka-connector] 01/01: Add a map Camel Headers to Kafka headers option to make the behavior configurable

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch 923
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit fb7248579af1e62c8510693bbfa1041c4ece5961
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Fri Jan 29 08:56:29 2021 +0100

    Add a map Camel Headers to Kafka headers option to make the behavior configurable
---
 .../camel/kafkaconnector/CamelConnectorConfig.java |  4 ++
 .../kafkaconnector/CamelSinkConnectorConfig.java   |  3 +-
 .../apache/camel/kafkaconnector/CamelSinkTask.java |  6 ++-
 .../kafkaconnector/CamelSourceConnectorConfig.java |  3 +-
 .../camel/kafkaconnector/CamelSourceTask.java      |  8 ++-
 .../camel/kafkaconnector/CamelSinkTaskTest.java    | 59 ++++++++++++++++++++++
 .../camel/kafkaconnector/CamelSourceTaskTest.java  | 25 +++++++++
 7 files changed, 103 insertions(+), 5 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
index 196b872..11598a3 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
@@ -47,6 +47,10 @@ public abstract class CamelConnectorConfig extends AbstractConfig {
     public static final String CAMEL_CONNECTOR_MAP_PROPERTIES_CONF = "camel.map.properties";
     public static final String CAMEL_CONNECTOR_MAP_PROPERTIES_DOC = "If set to true, the connector will transform the exchange properties into kafka headers.";
     
+    public static final Boolean CAMEL_CONNECTOR_MAP_HEADERS_DEFAULT = true;
+    public static final String CAMEL_CONNECTOR_MAP_HEADERS_CONF = "camel.map.headers";
+    public static final String CAMEL_CONNECTOR_MAP_HEADERS_DOC = "If set to true, the connector will transform the camel exchange headers into kafka headers.";
+    
     public static final int CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DEFAULT = 0;
     public static final String CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF = "camel.error.handler.max.redeliveries";
     public static final String CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DOC = "The maximum redeliveries to be use in case of Default Error Handler";
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index f40749d..1ef5a86 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -68,7 +68,8 @@ public class CamelSinkConnectorConfig extends CamelConnectorConfig {
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DOC)
         .define(CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, Type.STRING, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DOC)
-        .define(CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_MAP_PROPERTIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_MAP_PROPERTIES_DOC);
+        .define(CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_MAP_PROPERTIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_MAP_PROPERTIES_DOC)
+        .define(CAMEL_CONNECTOR_MAP_HEADERS_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_MAP_HEADERS_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_MAP_HEADERS_DOC);
     
     public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
         super(config, parsedConfig);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index c94ad15..5b97806 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -60,6 +60,7 @@ public class CamelSinkTask extends SinkTask {
     private Endpoint localEndpoint;
     private LoggingLevel loggingLevel = LoggingLevel.OFF;
     private boolean mapProperties;
+    private boolean mapHeaders;
 
     @Override
     public String version() {
@@ -103,6 +104,7 @@ public class CamelSinkTask extends SinkTask {
             final int idempotentRepositoryKafkaPollDuration = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF);
             final String headersRemovePattern = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF);
             mapProperties = config.getBoolean(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF);
+            mapHeaders = config.getBoolean(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_HEADERS_CONF);
             
             CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
@@ -173,7 +175,9 @@ public class CamelSinkTask extends SinkTask {
 
             for (Header header : record.headers()) {
                 if (header.key().startsWith(HEADER_CAMEL_PREFIX)) {
-                    mapHeader(header, HEADER_CAMEL_PREFIX, exchange.getMessage().getHeaders());
+                	if (mapHeaders) {
+                        mapHeader(header, HEADER_CAMEL_PREFIX, exchange.getMessage().getHeaders());
+                	}
                 } else if (header.key().startsWith(PROPERTY_CAMEL_PREFIX)) {
                     if (mapProperties) {
                         mapHeader(header, PROPERTY_CAMEL_PREFIX, exchange.getProperties());
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index 6384961..bb4f8f8 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -104,7 +104,8 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig {
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DOC)
         .define(CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, Type.STRING, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DOC)
-        .define(CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_MAP_PROPERTIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_MAP_PROPERTIES_DOC);;
+        .define(CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_MAP_PROPERTIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_MAP_PROPERTIES_DOC)
+        .define(CAMEL_CONNECTOR_MAP_HEADERS_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_MAP_HEADERS_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_MAP_HEADERS_DOC);
     
     public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
         super(config, parsedConfig);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index f45b78e..541626f 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -59,6 +59,7 @@ public class CamelSourceTask extends SourceTask {
     private String camelMessageHeaderKey;
     private LoggingLevel loggingLevel = LoggingLevel.OFF;
     private boolean mapProperties;
+	private boolean mapHeaders;
 
     @Override
     public String version() {
@@ -103,6 +104,7 @@ public class CamelSourceTask extends SourceTask {
             final int idempotentRepositoryKafkaPollDuration = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF);
             final String headersRemovePattern = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF);
             mapProperties = config.getBoolean(CamelSourceConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF);
+            mapHeaders = config.getBoolean(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_HEADERS_CONF);
             
             topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(",");
 
@@ -187,8 +189,10 @@ public class CamelSourceTask extends SourceTask {
                 SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, singleTopic, null, messageKeySchema,
                         messageHeaderKey, messageBodySchema, messageBodyValue, timestamp);
 
-                if (exchange.getMessage().hasHeaders()) {
-                    setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
+                if (mapHeaders) {
+                    if (exchange.getMessage().hasHeaders()) {
+                        setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
+                    }
                 }
                 
                 if (mapProperties) {
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 93661a7..5aaca7f 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -673,6 +673,65 @@ public class CamelSinkTaskTest {
     }
     
     @Test
+    public void testBodyAndPropertiesHeadersMixedWithoutPropertiesAndHeadersMapping() {
+        Map<String, String> props = new HashMap<>();
+        props.put(TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, "false");
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_HEADERS_CONF, "false");
+
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
+
+        Byte myByte = new Byte("100");
+        Float myFloat = new Float("100");
+        Short myShort = new Short("100");
+        Double myDouble = new Double("100");
+        int myInteger = 100;
+        Long myLong = new Long("100");
+
+        List<SinkRecord> records = new ArrayList<SinkRecord>();
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        record.headers().addBoolean(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyBoolean", true);
+        record.headers().addByte(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyByte", myByte);
+        record.headers().addFloat(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyFloat", myFloat);
+        record.headers().addShort(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyShort", myShort);
+        record.headers().addDouble(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyDouble", myDouble);
+        record.headers().addInt(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyInteger", myInteger);
+        record.headers().addLong(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyLong", myLong);
+        record.headers().addBoolean(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyBoolean", true);
+        record.headers().addByte(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyByte", myByte);
+        record.headers().addFloat(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyFloat", myFloat);
+        record.headers().addShort(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyShort", myShort);
+        record.headers().addDouble(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyDouble", myDouble);
+        record.headers().addInt(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyInteger", myInteger);
+        record.headers().addLong(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyLong", myLong);
+        records.add(record);
+        sinkTask.put(records);
+
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+        assertEquals("camel", exchange.getMessage().getBody());
+        assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertFalse(exchange.getProperties().containsKey("MyBoolean"));
+        assertFalse(exchange.getProperties().containsKey("MyByte"));
+        assertFalse(exchange.getProperties().containsKey("MyFloat"));
+        assertFalse(exchange.getProperties().containsKey("MyShort"));
+        assertFalse(exchange.getProperties().containsKey("MyDouble"));
+        assertFalse(exchange.getProperties().containsKey("MyInteger"));
+        assertFalse(exchange.getProperties().containsKey("MyLong"));
+        assertFalse(exchange.getMessage().getHeaders().containsKey("MyBoolean"));
+        assertFalse(exchange.getMessage().getHeaders().containsKey("MyByte"));
+        assertFalse(exchange.getMessage().getHeaders().containsKey("MyFloat"));
+        assertFalse(exchange.getMessage().getHeaders().containsKey("MyShort"));
+        assertFalse(exchange.getMessage().getHeaders().containsKey("MyDouble"));
+        assertFalse(exchange.getMessage().getHeaders().containsKey("MyInteger"));
+        assertFalse(exchange.getMessage().getHeaders().containsKey("MyLong"));
+
+        sinkTask.stop();
+    }
+    
+    @Test
     public void testBodyAndPropertiesHeadersMixedWithoutPropertiesMapping() {
         Map<String, String> props = new HashMap<>();
         props.put(TOPIC_CONF, TOPIC_NAME);
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index f90876e..b2a7c4e 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -355,6 +355,31 @@ public class CamelSourceTaskTest {
             sourceTask.stop();
         }
     }
+    
+    @Test
+    public void testSourceByteArrayHeaderMapping() {
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(mapOf(
+            CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
+            CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "direct",
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_MAP_HEADERS_CONF, "false",
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, "false",
+            CamelSourceTask.getCamelSourcePathConfigPrefix() + "name", "start"
+        ));
+
+        sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "test", "byteArray", new Byte[] {
+            1, 2
+        });
+
+        try {
+            List<SourceRecord> results = sourceTask.poll();
+            assertThat(results).hasSize(1);
+
+            assertEquals(0, results.get(0).headers().size());
+        } finally {
+            sourceTask.stop();
+        }
+    }
 
     @Test
     public void testSourceDateHeader() {