You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by oa...@apache.org on 2019/12/11 11:57:08 UTC

[camel-kafka-connector] branch fix-schema-source-task created (now bf74ea7)

This is an automated email from the ASF dual-hosted git repository.

oalsafi pushed a change to branch fix-schema-source-task
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


      at bf74ea7  Infer the suitable schema in the SourceTask

This branch includes the following new commits:

     new bf74ea7  Infer the suitable schema in the SourceTask

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel-kafka-connector] 01/01: Infer the suitable schema in the SourceTask

Posted by oa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

oalsafi pushed a commit to branch fix-schema-source-task
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit bf74ea7f56cc549fc8cc5d1080164ddb239a6555
Author: Omar Al-Safi <om...@gmail.com>
AuthorDate: Wed Dec 11 12:56:37 2019 +0100

    Infer the suitable schema in the SourceTask
---
 .../camel/kafkaconnector/CamelSourceTask.java      |  7 ++--
 .../camel/kafkaconnector/CamelSourceTaskTest.java  | 47 ++++++++++++++++++++++
 2 files changed, 51 insertions(+), 3 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 54d8eeb..21dce09 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -30,8 +30,7 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
-import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.camel.kafkaconnector.utils.SchemaHelper;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
@@ -108,7 +107,9 @@ public class CamelSourceTask extends SourceTask {
                 Map<String, String> sourcePartition = Collections.singletonMap("filename", exchange.getFromEndpoint().toString());
                 Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId());
 
-                SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, topic, Schema.BYTES_SCHEMA, exchange.getMessage().getBody());
+                final Object messageBodyValue = exchange.getMessage().getBody();
+
+                SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, topic, SchemaHelper.buildSchemaBuilderForType(messageBodyValue).build(), messageBodyValue);
                 if (exchange.getMessage().hasHeaders()) {
                     setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
                 }
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index bea6b8b..e46e888 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -20,6 +20,9 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.camel.ProducerTemplate;
+import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.header.Headers;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -59,6 +62,50 @@ public class CamelSourceTaskTest {
     }
 
     @Test
+    public void testSourcePollingWithBody() throws InterruptedException {
+        Map<String, String> props = new HashMap<>();
+        props.put("camel.source.url", "direct:start");
+        props.put("camel.source.kafka.topic", "mytopic");
+
+        CamelSourceTask camelSourceTask = new CamelSourceTask();
+        camelSourceTask.start(props);
+
+        final ProducerTemplate template = camelSourceTask.getCms().createProducerTemplate();
+
+        // send first data
+        template.sendBody("direct:start", "testing kafka connect");
+
+        Thread.sleep(100L);
+
+        List<SourceRecord> poll = camelSourceTask.poll();
+        assertEquals(1, poll.size());
+        assertEquals("testing kafka connect", poll.get(0).value());
+        assertEquals(Schema.Type.STRING, poll.get(0).valueSchema().type());
+
+        // send second data
+        template.sendBody("direct:start", true);
+
+        Thread.sleep(100L);
+
+        poll = camelSourceTask.poll();
+        assertEquals(1, poll.size());
+        assertTrue((boolean)poll.get(0).value());
+        assertEquals(Schema.Type.BOOLEAN, poll.get(0).valueSchema().type());
+
+        // second third data
+        template.sendBody("direct:start", 1234L);
+
+        Thread.sleep(100L);
+
+        poll = camelSourceTask.poll();
+        assertEquals(1, poll.size());
+        assertEquals(1234L, poll.get(0).value());
+        assertEquals(Schema.Type.INT64, poll.get(0).valueSchema().type());
+
+        camelSourceTask.stop();
+    }
+
+    @Test
     public void testSourcePollingTimeout() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
         props.put("camel.source.url", "timer:kafkaconnector");