You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by oa...@apache.org on 2019/12/11 11:57:09 UTC

[camel-kafka-connector] 01/01: Infer the suitable schema in the SourceTask

This is an automated email from the ASF dual-hosted git repository.

oalsafi pushed a commit to branch fix-schema-source-task
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit bf74ea7f56cc549fc8cc5d1080164ddb239a6555
Author: Omar Al-Safi <om...@gmail.com>
AuthorDate: Wed Dec 11 12:56:37 2019 +0100

    Infer the suitable schema in the SourceTask
---
 .../camel/kafkaconnector/CamelSourceTask.java      |  7 ++--
 .../camel/kafkaconnector/CamelSourceTaskTest.java  | 47 ++++++++++++++++++++++
 2 files changed, 51 insertions(+), 3 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 54d8eeb..21dce09 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -30,8 +30,7 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
-import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.camel.kafkaconnector.utils.SchemaHelper;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
@@ -108,7 +107,9 @@ public class CamelSourceTask extends SourceTask {
                 Map<String, String> sourcePartition = Collections.singletonMap("filename", exchange.getFromEndpoint().toString());
                 Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId());
 
-                SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, topic, Schema.BYTES_SCHEMA, exchange.getMessage().getBody());
+                final Object messageBodyValue = exchange.getMessage().getBody();
+
+                SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, topic, SchemaHelper.buildSchemaBuilderForType(messageBodyValue).build(), messageBodyValue);
                 if (exchange.getMessage().hasHeaders()) {
                     setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
                 }
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index bea6b8b..e46e888 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -20,6 +20,9 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.camel.ProducerTemplate;
+import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.header.Headers;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -59,6 +62,50 @@ public class CamelSourceTaskTest {
     }
 
     @Test
+    public void testSourcePollingWithBody() throws InterruptedException {
+        Map<String, String> props = new HashMap<>();
+        props.put("camel.source.url", "direct:start");
+        props.put("camel.source.kafka.topic", "mytopic");
+
+        CamelSourceTask camelSourceTask = new CamelSourceTask();
+        camelSourceTask.start(props);
+
+        final ProducerTemplate template = camelSourceTask.getCms().createProducerTemplate();
+
+        // send first data
+        template.sendBody("direct:start", "testing kafka connect");
+
+        Thread.sleep(100L);
+
+        List<SourceRecord> poll = camelSourceTask.poll();
+        assertEquals(1, poll.size());
+        assertEquals("testing kafka connect", poll.get(0).value());
+        assertEquals(Schema.Type.STRING, poll.get(0).valueSchema().type());
+
+        // send second data
+        template.sendBody("direct:start", true);
+
+        Thread.sleep(100L);
+
+        poll = camelSourceTask.poll();
+        assertEquals(1, poll.size());
+        assertTrue((boolean)poll.get(0).value());
+        assertEquals(Schema.Type.BOOLEAN, poll.get(0).valueSchema().type());
+
+        // second third data
+        template.sendBody("direct:start", 1234L);
+
+        Thread.sleep(100L);
+
+        poll = camelSourceTask.poll();
+        assertEquals(1, poll.size());
+        assertEquals(1234L, poll.get(0).value());
+        assertEquals(Schema.Type.INT64, poll.get(0).valueSchema().type());
+
+        camelSourceTask.stop();
+    }
+
+    @Test
     public void testSourcePollingTimeout() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
         props.put("camel.source.url", "timer:kafkaconnector");