You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/01/28 08:17:16 UTC

[camel-kafka-connector] 01/01: Add a map Camel Properties to Kafka headers option to make the behavior configurable

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch 922
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit ee7dcd2c6783965052e25b09837c78c334d0c230
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Jan 28 08:55:02 2021 +0100

    Add a map Camel Properties to Kafka headers option to make the behavior configurable
---
 .../camel/kafkaconnector/CamelConnectorConfig.java |  4 ++
 .../kafkaconnector/CamelSinkConnectorConfig.java   |  3 +-
 .../apache/camel/kafkaconnector/CamelSinkTask.java |  6 ++-
 .../kafkaconnector/CamelSourceConnectorConfig.java |  3 +-
 .../camel/kafkaconnector/CamelSourceTask.java      |  9 +++-
 .../camel/kafkaconnector/CamelSinkTaskTest.java    | 59 ++++++++++++++++++++++
 .../camel/kafkaconnector/CamelSourceTaskTest.java  | 24 +++++++++
 7 files changed, 103 insertions(+), 5 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
index 00c3ce4..196b872 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
@@ -43,6 +43,10 @@ public abstract class CamelConnectorConfig extends AbstractConfig {
     public static final String CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF = "camel.remove.headers.pattern";
     public static final String CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DOC = "The pattern of the headers we want to exclude from the exchange";
     
+    public static final Boolean CAMEL_CONNECTOR_MAP_PROPERTIES_DEFAULT = true;
+    public static final String CAMEL_CONNECTOR_MAP_PROPERTIES_CONF = "camel.map.properties";
+    public static final String CAMEL_CONNECTOR_MAP_PROPERTIES_DOC = "If set to true, the connector will transform the exchange properties into kafka headers.";
+    
     public static final int CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DEFAULT = 0;
     public static final String CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF = "camel.error.handler.max.redeliveries";
     public static final String CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DOC = "The maximum redeliveries to be use in case of Default Error Handler";
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index 6350fe9..f40749d 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -67,7 +67,8 @@ public class CamelSinkConnectorConfig extends CamelConnectorConfig {
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DOC)
-        .define(CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, Type.STRING, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DOC);
+        .define(CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, Type.STRING, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DOC)
+        .define(CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_MAP_PROPERTIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_MAP_PROPERTIES_DOC);
     
     public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
         super(config, parsedConfig);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index e44676b..a1bbb1e 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -59,6 +59,7 @@ public class CamelSinkTask extends SinkTask {
     private ProducerTemplate producer;
     private Endpoint localEndpoint;
     private LoggingLevel loggingLevel = LoggingLevel.OFF;
+    private boolean mapProperties;
 
     @Override
     public String version() {
@@ -101,6 +102,7 @@ public class CamelSinkTask extends SinkTask {
             final int idempotentRepositoryKafkaMaxCacheSize = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF);
             final int idempotentRepositoryKafkaPollDuration = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF);
             final String headersRemovePattern = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF);
+            mapProperties = config.getBoolean(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF);
             
             CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
@@ -173,7 +175,9 @@ public class CamelSinkTask extends SinkTask {
                 if (header.key().startsWith(HEADER_CAMEL_PREFIX)) {
                     mapHeader(header, HEADER_CAMEL_PREFIX, exchange.getMessage().getHeaders());
                 } else if (header.key().startsWith(PROPERTY_CAMEL_PREFIX)) {
-                    mapHeader(header, PROPERTY_CAMEL_PREFIX, exchange.getProperties());
+                	if (mapProperties) {
+                        mapHeader(header, PROPERTY_CAMEL_PREFIX, exchange.getProperties());
+                	}
                 }
             }
 
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index a703b18..6384961 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -103,7 +103,8 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig {
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DOC)
-        .define(CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, Type.STRING, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DOC);
+        .define(CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, Type.STRING, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DOC)
+        .define(CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_MAP_PROPERTIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_MAP_PROPERTIES_DOC);;
     
     public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
         super(config, parsedConfig);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index bd3aee6..f45b78e 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -58,6 +58,7 @@ public class CamelSourceTask extends SourceTask {
     private Long maxPollDuration;
     private String camelMessageHeaderKey;
     private LoggingLevel loggingLevel = LoggingLevel.OFF;
+    private boolean mapProperties;
 
     @Override
     public String version() {
@@ -101,6 +102,7 @@ public class CamelSourceTask extends SourceTask {
             final int idempotentRepositoryKafkaMaxCacheSize = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF);
             final int idempotentRepositoryKafkaPollDuration = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF);
             final String headersRemovePattern = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF);
+            mapProperties = config.getBoolean(CamelSourceConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF);
             
             topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(",");
 
@@ -188,8 +190,11 @@ public class CamelSourceTask extends SourceTask {
                 if (exchange.getMessage().hasHeaders()) {
                     setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
                 }
-                if (exchange.hasProperties()) {
-                    setAdditionalHeaders(record, exchange.getProperties(), PROPERTY_CAMEL_PREFIX);
+                
+                if (mapProperties) {
+                    if (exchange.hasProperties()) {
+                        setAdditionalHeaders(record, exchange.getProperties(), PROPERTY_CAMEL_PREFIX);
+                    }
                 }
 
                 TaskHelper.logRecordContent(LOG, loggingLevel, record);
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 047d858..93661a7 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -37,6 +37,7 @@ import org.junit.jupiter.api.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -670,6 +671,64 @@ public class CamelSinkTaskTest {
 
         sinkTask.stop();
     }
+    
+    @Test
+    public void testBodyAndPropertiesHeadersMixedWithoutPropertiesMapping() {
+        Map<String, String> props = new HashMap<>();
+        props.put(TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, "false");
+
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
+
+        Byte myByte = new Byte("100");
+        Float myFloat = new Float("100");
+        Short myShort = new Short("100");
+        Double myDouble = new Double("100");
+        int myInteger = 100;
+        Long myLong = new Long("100");
+
+        List<SinkRecord> records = new ArrayList<SinkRecord>();
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        record.headers().addBoolean(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyBoolean", true);
+        record.headers().addByte(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyByte", myByte);
+        record.headers().addFloat(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyFloat", myFloat);
+        record.headers().addShort(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyShort", myShort);
+        record.headers().addDouble(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyDouble", myDouble);
+        record.headers().addInt(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyInteger", myInteger);
+        record.headers().addLong(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyLong", myLong);
+        record.headers().addBoolean(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyBoolean", true);
+        record.headers().addByte(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyByte", myByte);
+        record.headers().addFloat(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyFloat", myFloat);
+        record.headers().addShort(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyShort", myShort);
+        record.headers().addDouble(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyDouble", myDouble);
+        record.headers().addInt(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyInteger", myInteger);
+        record.headers().addLong(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyLong", myLong);
+        records.add(record);
+        sinkTask.put(records);
+
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+        assertEquals("camel", exchange.getMessage().getBody());
+        assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertFalse(exchange.getProperties().containsKey("MyBoolean"));
+        assertFalse(exchange.getProperties().containsKey("MyByte"));
+        assertFalse(exchange.getProperties().containsKey("MyFloat"));
+        assertFalse(exchange.getProperties().containsKey("MyShort"));
+        assertFalse(exchange.getProperties().containsKey("MyDouble"));
+        assertFalse(exchange.getProperties().containsKey("MyInteger"));
+        assertFalse(exchange.getProperties().containsKey("MyLong"));
+        assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class));
+        assertEquals(myByte, exchange.getIn().getHeader("MyByte", Byte.class));
+        assertEquals(myFloat, exchange.getIn().getHeader("MyFloat", Float.class));
+        assertEquals(myShort, exchange.getIn().getHeader("MyShort", Short.class));
+        assertEquals(myDouble, exchange.getIn().getHeader("MyDouble", Double.class));
+        assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
+        assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
+
+        sinkTask.stop();
+    }
 
     @Test
     public void testIfExchangeFailsShouldThrowConnectException() {
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index fadbbb4..f90876e 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -331,6 +331,30 @@ public class CamelSourceTaskTest {
             sourceTask.stop();
         }
     }
+    
+    @Test
+    public void testSourceByteArrayProperty() {
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(mapOf(
+            CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
+            CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "direct",
+            CamelSourceConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, "false",
+            CamelSourceTask.getCamelSourcePathConfigPrefix() + "name", "start"
+        ));
+
+        sourceTask.getCms().getProducerTemplate().sendBodyAndProperty(DIRECT_URI, "test", "byteArray", new Byte[] {
+            1, 2
+        });
+
+        try {
+            List<SourceRecord> results = sourceTask.poll();
+            assertThat(results).hasSize(1);
+
+            assertEquals(0, results.get(0).headers().size());
+        } finally {
+            sourceTask.stop();
+        }
+    }
 
     @Test
     public void testSourceDateHeader() {