You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by oa...@apache.org on 2019/12/11 16:34:10 UTC

[camel-kafka-connector] branch source-record-key created (now 1d49281)

This is an automated email from the ASF dual-hosted git repository.

oalsafi pushed a change to branch source-record-key
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


      at 1d49281  Allow to configure a source record key from camel message header key

This branch includes the following new commits:

     new 1d49281  Allow to configure a source record key from camel message header key

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel-kafka-connector] 01/01: Allow to configure a source record key from camel message header key

Posted by oa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

oalsafi pushed a commit to branch source-record-key
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 1d49281ec988b62985aff5632a237cf576f727ab
Author: Omar Al-Safi <om...@gmail.com>
AuthorDate: Wed Dec 11 17:33:49 2019 +0100

    Allow to configure a source record key from camel message header key
---
 .../kafkaconnector/CamelSourceConnectorConfig.java |  8 ++-
 .../camel/kafkaconnector/CamelSourceTask.java      | 10 +++-
 .../camel/kafkaconnector/utils/SchemaHelper.java   |  3 ++
 .../camel/kafkaconnector/CamelSourceTaskTest.java  | 62 ++++++++++++++++++++++
 4 files changed, 81 insertions(+), 2 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index e8644d6..0d20d7b 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -47,6 +47,11 @@ public class CamelSourceConnectorConfig extends AbstractConfig {
     public static final Boolean CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT = true;
     public static final String CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF = "camel.source.pollingConsumerBlockWhenFull";
 
+    public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_DEFAULT = null;
+    public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF = "camel.source.camelMessageHeaderKey";
+    public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC = "The camel message header key that contain an unique key for the message which can be used a Kafka message key. If this is not specified, then the Kafka message will not have a key";
+
+
     private static final String CAMEL_SOURCE_URL_DOC = "The camel url to configure the source";
     private static final String CAMEL_SOURCE_UNMARSHAL_DOC = "The camel dataformat name to use to unmarshal data from the source";
     private static final String TOPIC_DOC = "The topic to publish data to";
@@ -74,6 +79,7 @@ public class CamelSourceConnectorConfig extends AbstractConfig {
                 .define(CAMEL_SOURCE_MAX_POLL_DURATION_CONF, Type.LONG, CAMEL_SOURCE_MAX_POLL_DURATION_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_POLL_DURATION_DOC)
                 .define(CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DOC)
                 .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DOC)
-                .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF, Type.BOOLEAN, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC);
+                .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF, Type.BOOLEAN, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC)
+                .define(CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, Type.STRING, CAMEL_SOURCE_MESSAGE_HEADER_KEY_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC);
     }
 }
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 21dce09..1011910 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -31,6 +31,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
 import org.apache.camel.kafkaconnector.utils.SchemaHelper;
+import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
@@ -50,6 +51,7 @@ public class CamelSourceTask extends SourceTask {
     private String topic;
     private Long maxBatchPollSize;
     private Long maxPollDuration;
+    private String camelMessageHeaderKey;
 
     @Override
     public String version() {
@@ -65,6 +67,8 @@ public class CamelSourceTask extends SourceTask {
             maxBatchPollSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF);
             maxPollDuration = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_POLL_DURATION_CONF);
 
+            camelMessageHeaderKey = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF);
+
             final String remoteUrl = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF);
             final String unmarshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_UNMARSHAL_CONF);
             topic = config.getString(CamelSourceConnectorConfig.TOPIC_CONF);
@@ -107,9 +111,13 @@ public class CamelSourceTask extends SourceTask {
                 Map<String, String> sourcePartition = Collections.singletonMap("filename", exchange.getFromEndpoint().toString());
                 Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId());
 
+                final Object messageHeaderKey = camelMessageHeaderKey != null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null;
                 final Object messageBodyValue = exchange.getMessage().getBody();
 
-                SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, topic, SchemaHelper.buildSchemaBuilderForType(messageBodyValue).build(), messageBodyValue);
+                final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null;
+                final Schema messageBodySchema = messageBodyValue != null ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null;
+
+                SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, topic, messageKeySchema, messageHeaderKey, messageBodySchema, messageBodyValue);
                 if (exchange.getMessage().hasHeaders()) {
                     setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
                 }
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java
index 5c0c295..b0e08a4 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java
@@ -20,6 +20,7 @@ import java.math.BigDecimal;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Schema;
@@ -38,6 +39,8 @@ public final class SchemaHelper {
      * @return {@link SchemaBuilder} instance
      */
     public static SchemaBuilder buildSchemaBuilderForType(final Object value) {
+        Objects.requireNonNull(value);
+
         // gracefully try to infer the schema
         final Schema knownSchema = Values.inferSchema(value);
 
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index e46e888..07de6b4 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -30,6 +30,7 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
 public class CamelSourceTaskTest {
@@ -62,6 +63,51 @@ public class CamelSourceTaskTest {
     }
 
     @Test
+    public void testSourcePollingWithKey() throws InterruptedException {
+        Map<String, String> props = new HashMap<>();
+        props.put("camel.source.url", "direct:start");
+        props.put("camel.source.kafka.topic", "mytopic");
+        props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, "CamelSpecialTestKey");
+
+        CamelSourceTask camelSourceTask = new CamelSourceTask();
+        camelSourceTask.start(props);
+
+        final ProducerTemplate template = camelSourceTask.getCms().createProducerTemplate();
+
+        // first we test if we have a key in the message with body
+        template.sendBodyAndHeader("direct:start", "awesome!", "CamelSpecialTestKey", 1234);
+
+        Thread.sleep(100L);
+
+        List<SourceRecord> poll = camelSourceTask.poll();
+        assertEquals(1, poll.size());
+        assertEquals(1234, poll.get(0).key());
+        assertEquals(Schema.Type.INT32, poll.get(0).keySchema().type());
+
+        // second we test if we have no key under the header
+        template.sendBodyAndHeader("direct:start", "awesome!", "WrongHeader", 1234);
+
+        Thread.sleep(100L);
+
+        poll = camelSourceTask.poll();
+        assertEquals(1, poll.size());
+        assertNull(poll.get(0).key());
+        assertNull(poll.get(0).keySchema());
+
+        // third we test if we have the header but with null value
+        template.sendBodyAndHeader("direct:start", "awesome!", "CamelSpecialTestKey", null);
+
+        Thread.sleep(100L);
+
+        camelSourceTask.poll();
+        assertEquals(1, poll.size());
+        assertNull(poll.get(0).key());
+        assertNull(poll.get(0).keySchema());
+
+        camelSourceTask.stop();
+    }
+
+    @Test
     public void testSourcePollingWithBody() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
         props.put("camel.source.url", "direct:start");
@@ -81,6 +127,8 @@ public class CamelSourceTaskTest {
         assertEquals(1, poll.size());
         assertEquals("testing kafka connect", poll.get(0).value());
         assertEquals(Schema.Type.STRING, poll.get(0).valueSchema().type());
+        assertNull(poll.get(0).key());
+        assertNull(poll.get(0).keySchema());
 
         // send second data
         template.sendBody("direct:start", true);
@@ -91,6 +139,8 @@ public class CamelSourceTaskTest {
         assertEquals(1, poll.size());
         assertTrue((boolean)poll.get(0).value());
         assertEquals(Schema.Type.BOOLEAN, poll.get(0).valueSchema().type());
+        assertNull(poll.get(0).key());
+        assertNull(poll.get(0).keySchema());
 
         // second third data
         template.sendBody("direct:start", 1234L);
@@ -101,6 +151,18 @@ public class CamelSourceTaskTest {
         assertEquals(1, poll.size());
         assertEquals(1234L, poll.get(0).value());
         assertEquals(Schema.Type.INT64, poll.get(0).valueSchema().type());
+        assertNull(poll.get(0).key());
+        assertNull(poll.get(0).keySchema());
+
+        // third with null data
+        template.sendBody("direct:start", null);
+
+        Thread.sleep(100L);
+        poll = camelSourceTask.poll();
+        assertNull(poll.get(0).key());
+        assertNull(poll.get(0).keySchema());
+        assertNull(poll.get(0).value());
+        assertNull(poll.get(0).valueSchema());
 
         camelSourceTask.stop();
     }