You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by oa...@apache.org on 2019/12/11 11:57:09 UTC
[camel-kafka-connector] 01/01: Infer the suitable schema in the
SourceTask
This is an automated email from the ASF dual-hosted git repository.
oalsafi pushed a commit to branch fix-schema-source-task
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit bf74ea7f56cc549fc8cc5d1080164ddb239a6555
Author: Omar Al-Safi <om...@gmail.com>
AuthorDate: Wed Dec 11 12:56:37 2019 +0100
Infer the suitable schema in the SourceTask
---
.../camel/kafkaconnector/CamelSourceTask.java | 7 ++--
.../camel/kafkaconnector/CamelSourceTaskTest.java | 47 ++++++++++++++++++++++
2 files changed, 51 insertions(+), 3 deletions(-)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 54d8eeb..21dce09 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -30,8 +30,7 @@ import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.PollingConsumer;
import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
-import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.camel.kafkaconnector.utils.SchemaHelper;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
@@ -108,7 +107,9 @@ public class CamelSourceTask extends SourceTask {
Map<String, String> sourcePartition = Collections.singletonMap("filename", exchange.getFromEndpoint().toString());
Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId());
- SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, topic, Schema.BYTES_SCHEMA, exchange.getMessage().getBody());
+ final Object messageBodyValue = exchange.getMessage().getBody();
+
+ SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, topic, SchemaHelper.buildSchemaBuilderForType(messageBodyValue).build(), messageBodyValue);
if (exchange.getMessage().hasHeaders()) {
setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
}
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index bea6b8b..e46e888 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -20,6 +20,9 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+
+import org.apache.camel.ProducerTemplate;
+import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.source.SourceRecord;
@@ -59,6 +62,50 @@ public class CamelSourceTaskTest {
}
@Test
+ public void testSourcePollingWithBody() throws InterruptedException {
+ Map<String, String> props = new HashMap<>();
+ props.put("camel.source.url", "direct:start");
+ props.put("camel.source.kafka.topic", "mytopic");
+
+ CamelSourceTask camelSourceTask = new CamelSourceTask();
+ camelSourceTask.start(props);
+
+ final ProducerTemplate template = camelSourceTask.getCms().createProducerTemplate();
+
+ // send first data
+ template.sendBody("direct:start", "testing kafka connect");
+
+ Thread.sleep(100L);
+
+ List<SourceRecord> poll = camelSourceTask.poll();
+ assertEquals(1, poll.size());
+ assertEquals("testing kafka connect", poll.get(0).value());
+ assertEquals(Schema.Type.STRING, poll.get(0).valueSchema().type());
+
+ // send second data
+ template.sendBody("direct:start", true);
+
+ Thread.sleep(100L);
+
+ poll = camelSourceTask.poll();
+ assertEquals(1, poll.size());
+ assertTrue((boolean)poll.get(0).value());
+ assertEquals(Schema.Type.BOOLEAN, poll.get(0).valueSchema().type());
+
+ // second third data
+ template.sendBody("direct:start", 1234L);
+
+ Thread.sleep(100L);
+
+ poll = camelSourceTask.poll();
+ assertEquals(1, poll.size());
+ assertEquals(1234L, poll.get(0).value());
+ assertEquals(Schema.Type.INT64, poll.get(0).valueSchema().type());
+
+ camelSourceTask.stop();
+ }
+
+ @Test
public void testSourcePollingTimeout() throws InterruptedException {
Map<String, String> props = new HashMap<>();
props.put("camel.source.url", "timer:kafkaconnector");