You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by oa...@apache.org on 2019/12/11 16:34:11 UTC
[camel-kafka-connector] 01/01: Allow to configure a source record
key from camel message header key
This is an automated email from the ASF dual-hosted git repository.
oalsafi pushed a commit to branch source-record-key
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 1d49281ec988b62985aff5632a237cf576f727ab
Author: Omar Al-Safi <om...@gmail.com>
AuthorDate: Wed Dec 11 17:33:49 2019 +0100
Allow to configure a source record key from camel message header key
---
.../kafkaconnector/CamelSourceConnectorConfig.java | 8 ++-
.../camel/kafkaconnector/CamelSourceTask.java | 10 +++-
.../camel/kafkaconnector/utils/SchemaHelper.java | 3 ++
.../camel/kafkaconnector/CamelSourceTaskTest.java | 62 ++++++++++++++++++++++
4 files changed, 81 insertions(+), 2 deletions(-)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index e8644d6..0d20d7b 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -47,6 +47,11 @@ public class CamelSourceConnectorConfig extends AbstractConfig {
public static final Boolean CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT = true;
public static final String CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF = "camel.source.pollingConsumerBlockWhenFull";
+ public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_DEFAULT = null;
+ public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF = "camel.source.camelMessageHeaderKey";
+ public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC = "The camel message header key that contain an unique key for the message which can be used a Kafka message key. If this is not specified, then the Kafka message will not have a key";
+
+
private static final String CAMEL_SOURCE_URL_DOC = "The camel url to configure the source";
private static final String CAMEL_SOURCE_UNMARSHAL_DOC = "The camel dataformat name to use to unmarshal data from the source";
private static final String TOPIC_DOC = "The topic to publish data to";
@@ -74,6 +79,7 @@ public class CamelSourceConnectorConfig extends AbstractConfig {
.define(CAMEL_SOURCE_MAX_POLL_DURATION_CONF, Type.LONG, CAMEL_SOURCE_MAX_POLL_DURATION_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_POLL_DURATION_DOC)
.define(CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DOC)
.define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DOC)
- .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF, Type.BOOLEAN, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC);
+ .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF, Type.BOOLEAN, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC)
+ .define(CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, Type.STRING, CAMEL_SOURCE_MESSAGE_HEADER_KEY_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC);
}
}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 21dce09..1011910 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -31,6 +31,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.PollingConsumer;
import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
import org.apache.camel.kafkaconnector.utils.SchemaHelper;
+import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
@@ -50,6 +51,7 @@ public class CamelSourceTask extends SourceTask {
private String topic;
private Long maxBatchPollSize;
private Long maxPollDuration;
+ private String camelMessageHeaderKey;
@Override
public String version() {
@@ -65,6 +67,8 @@ public class CamelSourceTask extends SourceTask {
maxBatchPollSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF);
maxPollDuration = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_POLL_DURATION_CONF);
+ camelMessageHeaderKey = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF);
+
final String remoteUrl = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF);
final String unmarshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_UNMARSHAL_CONF);
topic = config.getString(CamelSourceConnectorConfig.TOPIC_CONF);
@@ -107,9 +111,13 @@ public class CamelSourceTask extends SourceTask {
Map<String, String> sourcePartition = Collections.singletonMap("filename", exchange.getFromEndpoint().toString());
Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId());
+ final Object messageHeaderKey = camelMessageHeaderKey != null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null;
final Object messageBodyValue = exchange.getMessage().getBody();
- SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, topic, SchemaHelper.buildSchemaBuilderForType(messageBodyValue).build(), messageBodyValue);
+ final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null;
+ final Schema messageBodySchema = messageBodyValue != null ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null;
+
+ SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, topic, messageKeySchema, messageHeaderKey, messageBodySchema, messageBodyValue);
if (exchange.getMessage().hasHeaders()) {
setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java
index 5c0c295..b0e08a4 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java
@@ -20,6 +20,7 @@ import java.math.BigDecimal;
import java.util.Date;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
@@ -38,6 +39,8 @@ public final class SchemaHelper {
* @return {@link SchemaBuilder} instance
*/
public static SchemaBuilder buildSchemaBuilderForType(final Object value) {
+ Objects.requireNonNull(value);
+
// gracefully try to infer the schema
final Schema knownSchema = Values.inferSchema(value);
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index e46e888..07de6b4 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -30,6 +30,7 @@ import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
public class CamelSourceTaskTest {
@@ -62,6 +63,51 @@ public class CamelSourceTaskTest {
}
@Test
+ public void testSourcePollingWithKey() throws InterruptedException {
+ Map<String, String> props = new HashMap<>();
+ props.put("camel.source.url", "direct:start");
+ props.put("camel.source.kafka.topic", "mytopic");
+ props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, "CamelSpecialTestKey");
+
+ CamelSourceTask camelSourceTask = new CamelSourceTask();
+ camelSourceTask.start(props);
+
+ final ProducerTemplate template = camelSourceTask.getCms().createProducerTemplate();
+
+ // first we test if we have a key in the message with body
+ template.sendBodyAndHeader("direct:start", "awesome!", "CamelSpecialTestKey", 1234);
+
+ Thread.sleep(100L);
+
+ List<SourceRecord> poll = camelSourceTask.poll();
+ assertEquals(1, poll.size());
+ assertEquals(1234, poll.get(0).key());
+ assertEquals(Schema.Type.INT32, poll.get(0).keySchema().type());
+
+ // second we test if we have no key under the header
+ template.sendBodyAndHeader("direct:start", "awesome!", "WrongHeader", 1234);
+
+ Thread.sleep(100L);
+
+ poll = camelSourceTask.poll();
+ assertEquals(1, poll.size());
+ assertNull(poll.get(0).key());
+ assertNull(poll.get(0).keySchema());
+
+ // third we test if we have the header but with null value
+ template.sendBodyAndHeader("direct:start", "awesome!", "CamelSpecialTestKey", null);
+
+ Thread.sleep(100L);
+
+ camelSourceTask.poll();
+ assertEquals(1, poll.size());
+ assertNull(poll.get(0).key());
+ assertNull(poll.get(0).keySchema());
+
+ camelSourceTask.stop();
+ }
+
+ @Test
public void testSourcePollingWithBody() throws InterruptedException {
Map<String, String> props = new HashMap<>();
props.put("camel.source.url", "direct:start");
@@ -81,6 +127,8 @@ public class CamelSourceTaskTest {
assertEquals(1, poll.size());
assertEquals("testing kafka connect", poll.get(0).value());
assertEquals(Schema.Type.STRING, poll.get(0).valueSchema().type());
+ assertNull(poll.get(0).key());
+ assertNull(poll.get(0).keySchema());
// send second data
template.sendBody("direct:start", true);
@@ -91,6 +139,8 @@ public class CamelSourceTaskTest {
assertEquals(1, poll.size());
assertTrue((boolean)poll.get(0).value());
assertEquals(Schema.Type.BOOLEAN, poll.get(0).valueSchema().type());
+ assertNull(poll.get(0).key());
+ assertNull(poll.get(0).keySchema());
// second third data
template.sendBody("direct:start", 1234L);
@@ -101,6 +151,18 @@ public class CamelSourceTaskTest {
assertEquals(1, poll.size());
assertEquals(1234L, poll.get(0).value());
assertEquals(Schema.Type.INT64, poll.get(0).valueSchema().type());
+ assertNull(poll.get(0).key());
+ assertNull(poll.get(0).keySchema());
+
+ // third with null data
+ template.sendBody("direct:start", null);
+
+ Thread.sleep(100L);
+ poll = camelSourceTask.poll();
+ assertNull(poll.get(0).key());
+ assertNull(poll.get(0).keySchema());
+ assertNull(poll.get(0).value());
+ assertNull(poll.get(0).valueSchema());
camelSourceTask.stop();
}