You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/12/09 11:36:47 UTC

[camel-kafka-connector] 02/02: Support Map and List as Headers - List support

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch map-list-as-headers
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 5d007667f5f1c363e5f8ad9c55eb10bf655fbdb1
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Dec 9 12:36:13 2019 +0100

    Support Map and List as Headers - List support
---
 .../apache/camel/kafkaconnector/CamelSinkTask.java |   7 +-
 .../camel/kafkaconnector/CamelSourceTask.java      |   3 +
 .../camel/kafkaconnector/CamelSinkTaskTest.java    | 127 +++++++++++++++++++++
 3 files changed, 136 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 39dc58e..02832ef 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -19,6 +19,7 @@ package org.apache.camel.kafkaconnector;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import org.apache.camel.Exchange;
 import org.apache.camel.ProducerTemplate;
@@ -125,6 +126,8 @@ public class CamelSinkTask extends SinkTask {
             map.put(singleHeader.key(), (byte)singleHeader.value());
         } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).type().getName())) {
         	map.put(singleHeader.key(), (Map)singleHeader.value());
+        } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName())) {
+        	map.put(singleHeader.key(), (List)singleHeader.value());
         } 
     }
 
@@ -150,7 +153,9 @@ public class CamelSinkTask extends SinkTask {
             exchange.getProperties().put(singleHeader.key(), (byte)singleHeader.value());
         } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).type().getName())) {
         	exchange.getProperties().put(singleHeader.key(), (Map)singleHeader.value());
-        }
+        } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName())) {
+        	exchange.getProperties().put(singleHeader.key(), (List)singleHeader.value());
+        } 
     }
 
     public CamelMainSupport getCms() {
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 28fe5cf..35d176d 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -31,6 +31,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
@@ -182,6 +183,8 @@ public class CamelSourceTask extends SourceTask {
                 record.headers().addTimestamp(keyCamelHeader, (Timestamp)value);
             } else if (value instanceof Map) {
                 record.headers().addMap(keyCamelHeader, (Map)value, Schema.STRING_SCHEMA);
+            } else if (value instanceof List) {
+                record.headers().addList(keyCamelHeader, (List)value, SchemaBuilder.array(Schema.STRING_SCHEMA));
             }
         }
     }
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index bd771c3..4c529e3 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -291,6 +291,9 @@ public class CamelSinkTaskTest {
         record.headers().addDouble("CamelPropertyMyDouble", myDouble);
         record.headers().addInt("CamelPropertyMyInteger", myInteger);
         record.headers().addLong("CamelPropertyMyLong", myLong);
+        record.headers().addMap("CamelPropertyMyMap", map, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA));
+        record.headers().addMap("CamelPropertyMyMap1", map1, SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.STRING_SCHEMA));
+        record.headers().addMap("CamelPropertyMyMap2", map2, SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.INT64_SCHEMA));
         record.headers().addBoolean("CamelHeaderMyBoolean", true);
         record.headers().addByte("CamelHeaderMyByte", myByte);
         record.headers().addFloat("CamelHeaderMyFloat", myFloat);
@@ -315,6 +318,9 @@ public class CamelSinkTaskTest {
         assertEquals(myDouble, (Double) exchange.getProperties().get("CamelPropertyMyDouble"));
         assertEquals(myInteger, exchange.getProperties().get("CamelPropertyMyInteger"));
         assertEquals(myLong, (Long) exchange.getProperties().get("CamelPropertyMyLong"));
+        assertEquals(map, exchange.getProperties().get("CamelPropertyMyMap"));
+        assertEquals(map1, exchange.getProperties().get("CamelPropertyMyMap1"));
+        assertEquals(map2, exchange.getProperties().get("CamelPropertyMyMap2"));
         assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", Boolean.class));
         assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", Byte.class));
         assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", Float.class));
@@ -328,4 +334,125 @@ public class CamelSinkTaskTest {
 
         camelsinkTask.stop();
     }
+    
+    @Test
+    public void testBodyAndHeadersList() throws JsonProcessingException, InterruptedException {
+        Map<String, String> props = new HashMap<>();
+        props.put("camel.sink.url", "seda:test");
+        props.put("topics", "mytopic");
+
+        CamelSinkTask camelsinkTask = new CamelSinkTask();
+        camelsinkTask.start(props);
+
+        String topic = "mytopic";
+        Byte myByte = new Byte("100");
+        Float myFloat = new Float("100");
+        Short myShort = new Short("100");
+        Double myDouble = new Double("100");
+        int myInteger = 100;
+        Long myLong = new Long("100");
+        List<String> list = new ArrayList<String>();
+        list.add("a");
+        List<Integer> list1 = new ArrayList<Integer>();
+        list1.add(1);
+
+        List<SinkRecord> records = new ArrayList<SinkRecord>();
+        SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42);
+        record.headers().addBoolean("CamelHeaderMyBoolean", true);
+        record.headers().addByte("CamelHeaderMyByte", myByte);
+        record.headers().addFloat("CamelHeaderMyFloat", myFloat);
+        record.headers().addShort("CamelHeaderMyShort", myShort);
+        record.headers().addDouble("CamelHeaderMyDouble", myDouble);
+        record.headers().addInt("CamelHeaderMyInteger", myInteger);
+        record.headers().addLong("CamelHeaderMyLong", myLong);
+        record.headers().addList("CamelHeaderMyList", list, SchemaBuilder.array(Schema.STRING_SCHEMA));
+        record.headers().addList("CamelHeaderMyList1", list1, SchemaBuilder.array(Schema.INT64_SCHEMA));
+        records.add(record);
+        camelsinkTask.put(records);
+
+        ConsumerTemplate c = camelsinkTask.getCms().createConsumerTemplate();
+        Exchange exchange = c.receive("seda:test", 1000L);
+        assertEquals("camel", exchange.getMessage().getBody());
+        assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", Boolean.class));
+        assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", Byte.class));
+        assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", Float.class));
+        assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", Short.class));
+        assertEquals(myDouble, exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class));
+        assertEquals(myInteger, exchange.getIn().getHeader("CamelHeaderMyInteger"));
+        assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", Long.class));
+        assertEquals(list, exchange.getIn().getHeader("CamelHeaderMyList", List.class));
+        assertEquals(list1, exchange.getIn().getHeader("CamelHeaderMyList1", List.class));
+        camelsinkTask.stop();
+    }
+    
+    @Test
+    public void testBodyAndPropertiesHeadersListMixed() throws JsonProcessingException, InterruptedException {
+        Map<String, String> props = new HashMap<>();
+        props.put("camel.sink.url", "seda:test");
+        props.put("topics", "mytopic");
+
+        CamelSinkTask camelsinkTask = new CamelSinkTask();
+        camelsinkTask.start(props);
+
+        String topic = "mytopic";
+        Byte myByte = new Byte("100");
+        Float myFloat = new Float("100");
+        Short myShort = new Short("100");
+        Double myDouble = new Double("100");
+        int myInteger = 100;
+        Long myLong = new Long("100");
+        List<String> list = new ArrayList<String>();
+        list.add("a");
+        List<Integer> list1 = new ArrayList<Integer>();
+        list1.add(1);
+
+        List<SinkRecord> records = new ArrayList<SinkRecord>();
+        SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42);
+        record.headers().addBoolean("CamelPropertyMyBoolean", true);
+        record.headers().addByte("CamelPropertyMyByte", myByte);
+        record.headers().addFloat("CamelPropertyMyFloat", myFloat);
+        record.headers().addShort("CamelPropertyMyShort", myShort);
+        record.headers().addDouble("CamelPropertyMyDouble", myDouble);
+        record.headers().addInt("CamelPropertyMyInteger", myInteger);
+        record.headers().addLong("CamelPropertyMyLong", myLong);
+        record.headers().addBoolean("CamelHeaderMyBoolean", true);
+        record.headers().addByte("CamelHeaderMyByte", myByte);
+        record.headers().addFloat("CamelHeaderMyFloat", myFloat);
+        record.headers().addShort("CamelHeaderMyShort", myShort);
+        record.headers().addDouble("CamelHeaderMyDouble", myDouble);
+        record.headers().addInt("CamelHeaderMyInteger", myInteger);
+        record.headers().addLong("CamelHeaderMyLong", myLong);
+        record.headers().addList("CamelHeaderMyList", list, SchemaBuilder.array(Schema.STRING_SCHEMA));
+        record.headers().addList("CamelHeaderMyList1", list1, SchemaBuilder.array(Schema.INT64_SCHEMA));
+        record.headers().addList("CamelPropertyMyList", list, SchemaBuilder.array(Schema.STRING_SCHEMA));
+        record.headers().addList("CamelPropertyMyList1", list1, SchemaBuilder.array(Schema.INT64_SCHEMA));
+        records.add(record);
+        camelsinkTask.put(records);
+
+        ConsumerTemplate c = camelsinkTask.getCms().createConsumerTemplate();
+        Exchange exchange = c.receive("seda:test", 1000L);
+        assertEquals("camel", exchange.getMessage().getBody());
+        assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertTrue((boolean) exchange.getProperties().get("CamelPropertyMyBoolean"));
+        assertEquals(myByte, (Byte) exchange.getProperties().get("CamelPropertyMyByte"));
+        assertEquals(myFloat, (Float) exchange.getProperties().get("CamelPropertyMyFloat"));
+        assertEquals(myShort, (Short) exchange.getProperties().get("CamelPropertyMyShort"));
+        assertEquals(myDouble, (Double) exchange.getProperties().get("CamelPropertyMyDouble"));
+        assertEquals(myInteger, exchange.getProperties().get("CamelPropertyMyInteger"));
+        assertEquals(myLong, (Long) exchange.getProperties().get("CamelPropertyMyLong"));
+        assertEquals(list, exchange.getProperties().get("CamelPropertyMyList"));
+        assertEquals(list1, exchange.getProperties().get("CamelPropertyMyList1"));
+        assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", Boolean.class));
+        assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", Byte.class));
+        assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", Float.class));
+        assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", Short.class));
+        assertEquals(myDouble, exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class));
+        assertEquals(myInteger, exchange.getIn().getHeader("CamelHeaderMyInteger"));
+        assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", Long.class));
+        assertEquals(list, exchange.getIn().getHeader("CamelHeaderMyList", List.class));
+        assertEquals(list1, exchange.getIn().getHeader("CamelHeaderMyList1", List.class));
+
+        camelsinkTask.stop();
+    }
 }