You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by oa...@apache.org on 2019/12/11 16:34:11 UTC

[camel-kafka-connector] 01/01: Allow to configure a source record key from camel message header key

This is an automated email from the ASF dual-hosted git repository.

oalsafi pushed a commit to branch source-record-key
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 1d49281ec988b62985aff5632a237cf576f727ab
Author: Omar Al-Safi <om...@gmail.com>
AuthorDate: Wed Dec 11 17:33:49 2019 +0100

    Allow to configure a source record key from camel message header key
---
 .../kafkaconnector/CamelSourceConnectorConfig.java |  8 ++-
 .../camel/kafkaconnector/CamelSourceTask.java      | 10 +++-
 .../camel/kafkaconnector/utils/SchemaHelper.java   |  3 ++
 .../camel/kafkaconnector/CamelSourceTaskTest.java  | 62 ++++++++++++++++++++++
 4 files changed, 81 insertions(+), 2 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index e8644d6..0d20d7b 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -47,6 +47,11 @@ public class CamelSourceConnectorConfig extends AbstractConfig {
     public static final Boolean CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT = true;
     public static final String CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF = "camel.source.pollingConsumerBlockWhenFull";
 
+    public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_DEFAULT = null;
+    public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF = "camel.source.camelMessageHeaderKey";
+    public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC = "The camel message header key that contain an unique key for the message which can be used a Kafka message key. If this is not specified, then the Kafka message will not have a key";
+
+
     private static final String CAMEL_SOURCE_URL_DOC = "The camel url to configure the source";
     private static final String CAMEL_SOURCE_UNMARSHAL_DOC = "The camel dataformat name to use to unmarshal data from the source";
     private static final String TOPIC_DOC = "The topic to publish data to";
@@ -74,6 +79,7 @@ public class CamelSourceConnectorConfig extends AbstractConfig {
                 .define(CAMEL_SOURCE_MAX_POLL_DURATION_CONF, Type.LONG, CAMEL_SOURCE_MAX_POLL_DURATION_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_POLL_DURATION_DOC)
                 .define(CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DOC)
                 .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DOC)
-                .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF, Type.BOOLEAN, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC);
+                .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF, Type.BOOLEAN, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC)
+                .define(CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, Type.STRING, CAMEL_SOURCE_MESSAGE_HEADER_KEY_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC);
     }
 }
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 21dce09..1011910 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -31,6 +31,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
 import org.apache.camel.kafkaconnector.utils.SchemaHelper;
+import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
@@ -50,6 +51,7 @@ public class CamelSourceTask extends SourceTask {
     private String topic;
     private Long maxBatchPollSize;
     private Long maxPollDuration;
+    private String camelMessageHeaderKey;
 
     @Override
     public String version() {
@@ -65,6 +67,8 @@ public class CamelSourceTask extends SourceTask {
             maxBatchPollSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF);
             maxPollDuration = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_POLL_DURATION_CONF);
 
+            camelMessageHeaderKey = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF);
+
             final String remoteUrl = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF);
             final String unmarshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_UNMARSHAL_CONF);
             topic = config.getString(CamelSourceConnectorConfig.TOPIC_CONF);
@@ -107,9 +111,13 @@ public class CamelSourceTask extends SourceTask {
                 Map<String, String> sourcePartition = Collections.singletonMap("filename", exchange.getFromEndpoint().toString());
                 Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId());
 
+                final Object messageHeaderKey = camelMessageHeaderKey != null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null;
                 final Object messageBodyValue = exchange.getMessage().getBody();
 
-                SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, topic, SchemaHelper.buildSchemaBuilderForType(messageBodyValue).build(), messageBodyValue);
+                final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null;
+                final Schema messageBodySchema = messageBodyValue != null ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null;
+
+                SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, topic, messageKeySchema, messageHeaderKey, messageBodySchema, messageBodyValue);
                 if (exchange.getMessage().hasHeaders()) {
                     setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
                 }
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java
index 5c0c295..b0e08a4 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java
@@ -20,6 +20,7 @@ import java.math.BigDecimal;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Schema;
@@ -38,6 +39,8 @@ public final class SchemaHelper {
      * @return {@link SchemaBuilder} instance
      */
     public static SchemaBuilder buildSchemaBuilderForType(final Object value) {
+        Objects.requireNonNull(value);
+
         // gracefully try to infer the schema
         final Schema knownSchema = Values.inferSchema(value);
 
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index e46e888..07de6b4 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -30,6 +30,7 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
 public class CamelSourceTaskTest {
@@ -62,6 +63,51 @@ public class CamelSourceTaskTest {
     }
 
     @Test
+    public void testSourcePollingWithKey() throws InterruptedException {
+        Map<String, String> props = new HashMap<>();
+        props.put("camel.source.url", "direct:start");
+        props.put("camel.source.kafka.topic", "mytopic");
+        props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, "CamelSpecialTestKey");
+
+        CamelSourceTask camelSourceTask = new CamelSourceTask();
+        camelSourceTask.start(props);
+
+        final ProducerTemplate template = camelSourceTask.getCms().createProducerTemplate();
+
+        // first we test if we have a key in the message with body
+        template.sendBodyAndHeader("direct:start", "awesome!", "CamelSpecialTestKey", 1234);
+
+        Thread.sleep(100L);
+
+        List<SourceRecord> poll = camelSourceTask.poll();
+        assertEquals(1, poll.size());
+        assertEquals(1234, poll.get(0).key());
+        assertEquals(Schema.Type.INT32, poll.get(0).keySchema().type());
+
+        // second we test if we have no key under the header
+        template.sendBodyAndHeader("direct:start", "awesome!", "WrongHeader", 1234);
+
+        Thread.sleep(100L);
+
+        poll = camelSourceTask.poll();
+        assertEquals(1, poll.size());
+        assertNull(poll.get(0).key());
+        assertNull(poll.get(0).keySchema());
+
+        // third we test if we have the header but with null value
+        template.sendBodyAndHeader("direct:start", "awesome!", "CamelSpecialTestKey", null);
+
+        Thread.sleep(100L);
+
+        camelSourceTask.poll();
+        assertEquals(1, poll.size());
+        assertNull(poll.get(0).key());
+        assertNull(poll.get(0).keySchema());
+
+        camelSourceTask.stop();
+    }
+
+    @Test
     public void testSourcePollingWithBody() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
         props.put("camel.source.url", "direct:start");
@@ -81,6 +127,8 @@ public class CamelSourceTaskTest {
         assertEquals(1, poll.size());
         assertEquals("testing kafka connect", poll.get(0).value());
         assertEquals(Schema.Type.STRING, poll.get(0).valueSchema().type());
+        assertNull(poll.get(0).key());
+        assertNull(poll.get(0).keySchema());
 
         // send second data
         template.sendBody("direct:start", true);
@@ -91,6 +139,8 @@ public class CamelSourceTaskTest {
         assertEquals(1, poll.size());
         assertTrue((boolean)poll.get(0).value());
         assertEquals(Schema.Type.BOOLEAN, poll.get(0).valueSchema().type());
+        assertNull(poll.get(0).key());
+        assertNull(poll.get(0).keySchema());
 
         // second third data
         template.sendBody("direct:start", 1234L);
@@ -101,6 +151,18 @@ public class CamelSourceTaskTest {
         assertEquals(1, poll.size());
         assertEquals(1234L, poll.get(0).value());
         assertEquals(Schema.Type.INT64, poll.get(0).valueSchema().type());
+        assertNull(poll.get(0).key());
+        assertNull(poll.get(0).keySchema());
+
+        // third with null data
+        template.sendBody("direct:start", null);
+
+        Thread.sleep(100L);
+        poll = camelSourceTask.poll();
+        assertNull(poll.get(0).key());
+        assertNull(poll.get(0).keySchema());
+        assertNull(poll.get(0).value());
+        assertNull(poll.get(0).valueSchema());
 
         camelSourceTask.stop();
     }