You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/12/09 11:36:45 UTC

[camel-kafka-connector] branch map-list-as-headers created (now 5d00766)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch map-list-as-headers
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


      at 5d00766  Support Map and List as Headers - List support

This branch includes the following new commits:

     new d1d0a48  Support Map and List as Headers - Map support
     new 5d00766  Support Map and List as Headers - List support

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel-kafka-connector] 01/02: Support Map and List as Headers - Map support

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch map-list-as-headers
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit d1d0a48503a662745d3526a16d8f49e164c329b7
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Dec 9 12:07:43 2019 +0100

    Support Map and List as Headers - Map support
---
 .../apache/camel/kafkaconnector/CamelSinkTask.java |   7 +-
 .../camel/kafkaconnector/CamelSourceTask.java      |   2 +
 .../camel/kafkaconnector/CamelSinkTaskTest.java    | 127 +++++++++++++++++++++
 3 files changed, 135 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 80c018f..39dc58e 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -25,6 +25,7 @@ import org.apache.camel.ProducerTemplate;
 import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
 import org.apache.camel.support.DefaultExchange;
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -122,7 +123,9 @@ public class CamelSinkTask extends SinkTask {
             map.put(singleHeader.key(), (long)singleHeader.value());
         } else if (schema.type().getName().equalsIgnoreCase(Schema.INT8_SCHEMA.type().getName())) {
             map.put(singleHeader.key(), (byte)singleHeader.value());
-        }
+        } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).type().getName())) {
+        	map.put(singleHeader.key(), (Map)singleHeader.value());
+        } 
     }
 
     private void addProperty(Exchange exchange, Header singleHeader) {
@@ -145,6 +148,8 @@ public class CamelSinkTask extends SinkTask {
             exchange.getProperties().put(singleHeader.key(), (long)singleHeader.value());
         } else if (schema.type().getName().equalsIgnoreCase(Schema.INT8_SCHEMA.type().getName())) {
             exchange.getProperties().put(singleHeader.key(), (byte)singleHeader.value());
+        } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).type().getName())) {
+        	exchange.getProperties().put(singleHeader.key(), (Map)singleHeader.value());
         }
     }
 
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index e0147c0..28fe5cf 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -180,6 +180,8 @@ public class CamelSourceTask extends SourceTask {
                 record.headers().addTime(keyCamelHeader, (Time)value);
             } else if (value instanceof Timestamp) {
                 record.headers().addTimestamp(keyCamelHeader, (Timestamp)value);
+            } else if (value instanceof Map) {
+                record.headers().addMap(keyCamelHeader, (Map)value, Schema.STRING_SCHEMA);
             }
         }
     }
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 3ae60be..bd771c3 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -24,6 +24,8 @@ import java.util.Map;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.Exchange;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.junit.Test;
 
@@ -201,4 +203,129 @@ public class CamelSinkTaskTest {
 
         camelsinkTask.stop();
     }
+    
+    @Test
+    public void testBodyAndHeadersMap() throws JsonProcessingException, InterruptedException {
+        Map<String, String> props = new HashMap<>();
+        props.put("camel.sink.url", "seda:test");
+        props.put("topics", "mytopic");
+
+        CamelSinkTask camelsinkTask = new CamelSinkTask();
+        camelsinkTask.start(props);
+
+        String topic = "mytopic";
+        Byte myByte = new Byte("100");
+        Float myFloat = new Float("100");
+        Short myShort = new Short("100");
+        Double myDouble = new Double("100");
+        int myInteger = 100;
+        Long myLong = new Long("100");
+        HashMap<String, String> map = new HashMap<String, String>();
+        map.put("a", "a");
+        HashMap<Integer, String> map1 = new HashMap<Integer, String>();
+        map1.put(1, "a");
+        HashMap<Integer, Integer> map2 = new HashMap<Integer, Integer>();
+        map2.put(1, 1);
+
+        List<SinkRecord> records = new ArrayList<SinkRecord>();
+        SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42);
+        record.headers().addBoolean("CamelHeaderMyBoolean", true);
+        record.headers().addByte("CamelHeaderMyByte", myByte);
+        record.headers().addFloat("CamelHeaderMyFloat", myFloat);
+        record.headers().addShort("CamelHeaderMyShort", myShort);
+        record.headers().addDouble("CamelHeaderMyDouble", myDouble);
+        record.headers().addInt("CamelHeaderMyInteger", myInteger);
+        record.headers().addLong("CamelHeaderMyLong", myLong);
+        record.headers().addMap("CamelHeaderMyMap", map, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA));
+        record.headers().addMap("CamelHeaderMyMap1", map1, SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.STRING_SCHEMA));
+        record.headers().addMap("CamelHeaderMyMap2", map2, SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.INT64_SCHEMA));
+        records.add(record);
+        camelsinkTask.put(records);
+
+        ConsumerTemplate c = camelsinkTask.getCms().createConsumerTemplate();
+        Exchange exchange = c.receive("seda:test", 1000L);
+        assertEquals("camel", exchange.getMessage().getBody());
+        assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", Boolean.class));
+        assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", Byte.class));
+        assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", Float.class));
+        assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", Short.class));
+        assertEquals(myDouble, exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class));
+        assertEquals(myInteger, exchange.getIn().getHeader("CamelHeaderMyInteger"));
+        assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", Long.class));
+        assertEquals(map, exchange.getIn().getHeader("CamelHeaderMyMap", Map.class));
+        assertEquals(map1, exchange.getIn().getHeader("CamelHeaderMyMap1", Map.class));
+        assertEquals(map2, exchange.getIn().getHeader("CamelHeaderMyMap2", Map.class));
+        camelsinkTask.stop();
+    }
+    
+    @Test
+    public void testBodyAndPropertiesHeadersMapMixed() throws JsonProcessingException, InterruptedException {
+        Map<String, String> props = new HashMap<>();
+        props.put("camel.sink.url", "seda:test");
+        props.put("topics", "mytopic");
+
+        CamelSinkTask camelsinkTask = new CamelSinkTask();
+        camelsinkTask.start(props);
+
+        String topic = "mytopic";
+        Byte myByte = new Byte("100");
+        Float myFloat = new Float("100");
+        Short myShort = new Short("100");
+        Double myDouble = new Double("100");
+        int myInteger = 100;
+        Long myLong = new Long("100");
+        HashMap<String, String> map = new HashMap<String, String>();
+        map.put("a", "a");
+        HashMap<Integer, String> map1 = new HashMap<Integer, String>();
+        map1.put(1, "a");
+        HashMap<Integer, Integer> map2 = new HashMap<Integer, Integer>();
+        map2.put(1, 1);
+
+        List<SinkRecord> records = new ArrayList<SinkRecord>();
+        SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42);
+        record.headers().addBoolean("CamelPropertyMyBoolean", true);
+        record.headers().addByte("CamelPropertyMyByte", myByte);
+        record.headers().addFloat("CamelPropertyMyFloat", myFloat);
+        record.headers().addShort("CamelPropertyMyShort", myShort);
+        record.headers().addDouble("CamelPropertyMyDouble", myDouble);
+        record.headers().addInt("CamelPropertyMyInteger", myInteger);
+        record.headers().addLong("CamelPropertyMyLong", myLong);
+        record.headers().addBoolean("CamelHeaderMyBoolean", true);
+        record.headers().addByte("CamelHeaderMyByte", myByte);
+        record.headers().addFloat("CamelHeaderMyFloat", myFloat);
+        record.headers().addShort("CamelHeaderMyShort", myShort);
+        record.headers().addDouble("CamelHeaderMyDouble", myDouble);
+        record.headers().addInt("CamelHeaderMyInteger", myInteger);
+        record.headers().addLong("CamelHeaderMyLong", myLong);
+        record.headers().addMap("CamelHeaderMyMap", map, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA));
+        record.headers().addMap("CamelHeaderMyMap1", map1, SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.STRING_SCHEMA));
+        record.headers().addMap("CamelHeaderMyMap2", map2, SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.INT64_SCHEMA));
+        records.add(record);
+        camelsinkTask.put(records);
+
+        ConsumerTemplate c = camelsinkTask.getCms().createConsumerTemplate();
+        Exchange exchange = c.receive("seda:test", 1000L);
+        assertEquals("camel", exchange.getMessage().getBody());
+        assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertTrue((boolean) exchange.getProperties().get("CamelPropertyMyBoolean"));
+        assertEquals(myByte, (Byte) exchange.getProperties().get("CamelPropertyMyByte"));
+        assertEquals(myFloat, (Float) exchange.getProperties().get("CamelPropertyMyFloat"));
+        assertEquals(myShort, (Short) exchange.getProperties().get("CamelPropertyMyShort"));
+        assertEquals(myDouble, (Double) exchange.getProperties().get("CamelPropertyMyDouble"));
+        assertEquals(myInteger, exchange.getProperties().get("CamelPropertyMyInteger"));
+        assertEquals(myLong, (Long) exchange.getProperties().get("CamelPropertyMyLong"));
+        assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", Boolean.class));
+        assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", Byte.class));
+        assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", Float.class));
+        assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", Short.class));
+        assertEquals(myDouble, exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class));
+        assertEquals(myInteger, exchange.getIn().getHeader("CamelHeaderMyInteger"));
+        assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", Long.class));
+        assertEquals(map, exchange.getIn().getHeader("CamelHeaderMyMap", Map.class));
+        assertEquals(map1, exchange.getIn().getHeader("CamelHeaderMyMap1", Map.class));
+        assertEquals(map2, exchange.getIn().getHeader("CamelHeaderMyMap2", Map.class));
+
+        camelsinkTask.stop();
+    }
 }


[camel-kafka-connector] 02/02: Support Map and List as Headers - List support

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch map-list-as-headers
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 5d007667f5f1c363e5f8ad9c55eb10bf655fbdb1
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Dec 9 12:36:13 2019 +0100

    Support Map and List as Headers - List support
---
 .../apache/camel/kafkaconnector/CamelSinkTask.java |   7 +-
 .../camel/kafkaconnector/CamelSourceTask.java      |   3 +
 .../camel/kafkaconnector/CamelSinkTaskTest.java    | 127 +++++++++++++++++++++
 3 files changed, 136 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 39dc58e..02832ef 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -19,6 +19,7 @@ package org.apache.camel.kafkaconnector;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import org.apache.camel.Exchange;
 import org.apache.camel.ProducerTemplate;
@@ -125,6 +126,8 @@ public class CamelSinkTask extends SinkTask {
             map.put(singleHeader.key(), (byte)singleHeader.value());
         } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).type().getName())) {
         	map.put(singleHeader.key(), (Map)singleHeader.value());
+        } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName())) {
+        	map.put(singleHeader.key(), (List)singleHeader.value());
         } 
     }
 
@@ -150,7 +153,9 @@ public class CamelSinkTask extends SinkTask {
             exchange.getProperties().put(singleHeader.key(), (byte)singleHeader.value());
         } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).type().getName())) {
         	exchange.getProperties().put(singleHeader.key(), (Map)singleHeader.value());
-        }
+        } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName())) {
+        	exchange.getProperties().put(singleHeader.key(), (List)singleHeader.value());
+        } 
     }
 
     public CamelMainSupport getCms() {
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 28fe5cf..35d176d 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -31,6 +31,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
@@ -182,6 +183,8 @@ public class CamelSourceTask extends SourceTask {
                 record.headers().addTimestamp(keyCamelHeader, (Timestamp)value);
             } else if (value instanceof Map) {
                 record.headers().addMap(keyCamelHeader, (Map)value, Schema.STRING_SCHEMA);
+            } else if (value instanceof List) {
+                record.headers().addList(keyCamelHeader, (List)value, SchemaBuilder.array(Schema.STRING_SCHEMA));
             }
         }
     }
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index bd771c3..4c529e3 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -291,6 +291,9 @@ public class CamelSinkTaskTest {
         record.headers().addDouble("CamelPropertyMyDouble", myDouble);
         record.headers().addInt("CamelPropertyMyInteger", myInteger);
         record.headers().addLong("CamelPropertyMyLong", myLong);
+        record.headers().addMap("CamelPropertyMyMap", map, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA));
+        record.headers().addMap("CamelPropertyMyMap1", map1, SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.STRING_SCHEMA));
+        record.headers().addMap("CamelPropertyMyMap2", map2, SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.INT64_SCHEMA));
         record.headers().addBoolean("CamelHeaderMyBoolean", true);
         record.headers().addByte("CamelHeaderMyByte", myByte);
         record.headers().addFloat("CamelHeaderMyFloat", myFloat);
@@ -315,6 +318,9 @@ public class CamelSinkTaskTest {
         assertEquals(myDouble, (Double) exchange.getProperties().get("CamelPropertyMyDouble"));
         assertEquals(myInteger, exchange.getProperties().get("CamelPropertyMyInteger"));
         assertEquals(myLong, (Long) exchange.getProperties().get("CamelPropertyMyLong"));
+        assertEquals(map, exchange.getProperties().get("CamelPropertyMyMap"));
+        assertEquals(map1, exchange.getProperties().get("CamelPropertyMyMap1"));
+        assertEquals(map2, exchange.getProperties().get("CamelPropertyMyMap2"));
         assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", Boolean.class));
         assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", Byte.class));
         assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", Float.class));
@@ -328,4 +334,125 @@ public class CamelSinkTaskTest {
 
         camelsinkTask.stop();
     }
+    
+    @Test
+    public void testBodyAndHeadersList() throws JsonProcessingException, InterruptedException {
+        Map<String, String> props = new HashMap<>();
+        props.put("camel.sink.url", "seda:test");
+        props.put("topics", "mytopic");
+
+        CamelSinkTask camelsinkTask = new CamelSinkTask();
+        camelsinkTask.start(props);
+
+        String topic = "mytopic";
+        Byte myByte = new Byte("100");
+        Float myFloat = new Float("100");
+        Short myShort = new Short("100");
+        Double myDouble = new Double("100");
+        int myInteger = 100;
+        Long myLong = new Long("100");
+        List<String> list = new ArrayList<String>();
+        list.add("a");
+        List<Integer> list1 = new ArrayList<Integer>();
+        list1.add(1);
+
+        List<SinkRecord> records = new ArrayList<SinkRecord>();
+        SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42);
+        record.headers().addBoolean("CamelHeaderMyBoolean", true);
+        record.headers().addByte("CamelHeaderMyByte", myByte);
+        record.headers().addFloat("CamelHeaderMyFloat", myFloat);
+        record.headers().addShort("CamelHeaderMyShort", myShort);
+        record.headers().addDouble("CamelHeaderMyDouble", myDouble);
+        record.headers().addInt("CamelHeaderMyInteger", myInteger);
+        record.headers().addLong("CamelHeaderMyLong", myLong);
+        record.headers().addList("CamelHeaderMyList", list, SchemaBuilder.array(Schema.STRING_SCHEMA));
+        record.headers().addList("CamelHeaderMyList1", list1, SchemaBuilder.array(Schema.INT64_SCHEMA));
+        records.add(record);
+        camelsinkTask.put(records);
+
+        ConsumerTemplate c = camelsinkTask.getCms().createConsumerTemplate();
+        Exchange exchange = c.receive("seda:test", 1000L);
+        assertEquals("camel", exchange.getMessage().getBody());
+        assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", Boolean.class));
+        assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", Byte.class));
+        assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", Float.class));
+        assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", Short.class));
+        assertEquals(myDouble, exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class));
+        assertEquals(myInteger, exchange.getIn().getHeader("CamelHeaderMyInteger"));
+        assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", Long.class));
+        assertEquals(list, exchange.getIn().getHeader("CamelHeaderMyList", List.class));
+        assertEquals(list1, exchange.getIn().getHeader("CamelHeaderMyList1", List.class));
+        camelsinkTask.stop();
+    }
+    
+    @Test
+    public void testBodyAndPropertiesHeadersListMixed() throws JsonProcessingException, InterruptedException {
+        Map<String, String> props = new HashMap<>();
+        props.put("camel.sink.url", "seda:test");
+        props.put("topics", "mytopic");
+
+        CamelSinkTask camelsinkTask = new CamelSinkTask();
+        camelsinkTask.start(props);
+
+        String topic = "mytopic";
+        Byte myByte = new Byte("100");
+        Float myFloat = new Float("100");
+        Short myShort = new Short("100");
+        Double myDouble = new Double("100");
+        int myInteger = 100;
+        Long myLong = new Long("100");
+        List<String> list = new ArrayList<String>();
+        list.add("a");
+        List<Integer> list1 = new ArrayList<Integer>();
+        list1.add(1);
+
+        List<SinkRecord> records = new ArrayList<SinkRecord>();
+        SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42);
+        record.headers().addBoolean("CamelPropertyMyBoolean", true);
+        record.headers().addByte("CamelPropertyMyByte", myByte);
+        record.headers().addFloat("CamelPropertyMyFloat", myFloat);
+        record.headers().addShort("CamelPropertyMyShort", myShort);
+        record.headers().addDouble("CamelPropertyMyDouble", myDouble);
+        record.headers().addInt("CamelPropertyMyInteger", myInteger);
+        record.headers().addLong("CamelPropertyMyLong", myLong);
+        record.headers().addBoolean("CamelHeaderMyBoolean", true);
+        record.headers().addByte("CamelHeaderMyByte", myByte);
+        record.headers().addFloat("CamelHeaderMyFloat", myFloat);
+        record.headers().addShort("CamelHeaderMyShort", myShort);
+        record.headers().addDouble("CamelHeaderMyDouble", myDouble);
+        record.headers().addInt("CamelHeaderMyInteger", myInteger);
+        record.headers().addLong("CamelHeaderMyLong", myLong);
+        record.headers().addList("CamelHeaderMyList", list, SchemaBuilder.array(Schema.STRING_SCHEMA));
+        record.headers().addList("CamelHeaderMyList1", list1, SchemaBuilder.array(Schema.INT64_SCHEMA));
+        record.headers().addList("CamelPropertyMyList", list, SchemaBuilder.array(Schema.STRING_SCHEMA));
+        record.headers().addList("CamelPropertyMyList1", list1, SchemaBuilder.array(Schema.INT64_SCHEMA));
+        records.add(record);
+        camelsinkTask.put(records);
+
+        ConsumerTemplate c = camelsinkTask.getCms().createConsumerTemplate();
+        Exchange exchange = c.receive("seda:test", 1000L);
+        assertEquals("camel", exchange.getMessage().getBody());
+        assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertTrue((boolean) exchange.getProperties().get("CamelPropertyMyBoolean"));
+        assertEquals(myByte, (Byte) exchange.getProperties().get("CamelPropertyMyByte"));
+        assertEquals(myFloat, (Float) exchange.getProperties().get("CamelPropertyMyFloat"));
+        assertEquals(myShort, (Short) exchange.getProperties().get("CamelPropertyMyShort"));
+        assertEquals(myDouble, (Double) exchange.getProperties().get("CamelPropertyMyDouble"));
+        assertEquals(myInteger, exchange.getProperties().get("CamelPropertyMyInteger"));
+        assertEquals(myLong, (Long) exchange.getProperties().get("CamelPropertyMyLong"));
+        assertEquals(list, exchange.getProperties().get("CamelPropertyMyList"));
+        assertEquals(list1, exchange.getProperties().get("CamelPropertyMyList1"));
+        assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", Boolean.class));
+        assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", Byte.class));
+        assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", Float.class));
+        assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", Short.class));
+        assertEquals(myDouble, exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class));
+        assertEquals(myInteger, exchange.getIn().getHeader("CamelHeaderMyInteger"));
+        assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", Long.class));
+        assertEquals(list, exchange.getIn().getHeader("CamelHeaderMyList", List.class));
+        assertEquals(list1, exchange.getIn().getHeader("CamelHeaderMyList1", List.class));
+
+        camelsinkTask.stop();
+    }
 }