You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/12/09 11:36:46 UTC

[camel-kafka-connector] 01/02: Support Map and List as Headers - Map support

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch map-list-as-headers
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit d1d0a48503a662745d3526a16d8f49e164c329b7
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Dec 9 12:07:43 2019 +0100

    Support Map and List as Headers - Map support
---
 .../apache/camel/kafkaconnector/CamelSinkTask.java |   7 +-
 .../camel/kafkaconnector/CamelSourceTask.java      |   2 +
 .../camel/kafkaconnector/CamelSinkTaskTest.java    | 127 +++++++++++++++++++++
 3 files changed, 135 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 80c018f..39dc58e 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -25,6 +25,7 @@ import org.apache.camel.ProducerTemplate;
 import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
 import org.apache.camel.support.DefaultExchange;
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -122,7 +123,9 @@ public class CamelSinkTask extends SinkTask {
             map.put(singleHeader.key(), (long)singleHeader.value());
         } else if (schema.type().getName().equalsIgnoreCase(Schema.INT8_SCHEMA.type().getName())) {
             map.put(singleHeader.key(), (byte)singleHeader.value());
-        }
+        } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).type().getName())) {
+        	map.put(singleHeader.key(), (Map)singleHeader.value());
+        } 
     }
 
     private void addProperty(Exchange exchange, Header singleHeader) {
@@ -145,6 +148,8 @@ public class CamelSinkTask extends SinkTask {
             exchange.getProperties().put(singleHeader.key(), (long)singleHeader.value());
         } else if (schema.type().getName().equalsIgnoreCase(Schema.INT8_SCHEMA.type().getName())) {
             exchange.getProperties().put(singleHeader.key(), (byte)singleHeader.value());
+        } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).type().getName())) {
+        	exchange.getProperties().put(singleHeader.key(), (Map)singleHeader.value());
         }
     }
 
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index e0147c0..28fe5cf 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -180,6 +180,8 @@ public class CamelSourceTask extends SourceTask {
                 record.headers().addTime(keyCamelHeader, (Time)value);
             } else if (value instanceof Timestamp) {
                 record.headers().addTimestamp(keyCamelHeader, (Timestamp)value);
+            } else if (value instanceof Map) {
+                record.headers().addMap(keyCamelHeader, (Map)value, Schema.STRING_SCHEMA);
             }
         }
     }
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 3ae60be..bd771c3 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -24,6 +24,8 @@ import java.util.Map;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.Exchange;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.junit.Test;
 
@@ -201,4 +203,129 @@ public class CamelSinkTaskTest {
 
         camelsinkTask.stop();
     }
+    
+    @Test
+    public void testBodyAndHeadersMap() throws JsonProcessingException, InterruptedException {
+        Map<String, String> props = new HashMap<>();
+        props.put("camel.sink.url", "seda:test");
+        props.put("topics", "mytopic");
+
+        CamelSinkTask camelsinkTask = new CamelSinkTask();
+        camelsinkTask.start(props);
+
+        String topic = "mytopic";
+        Byte myByte = new Byte("100");
+        Float myFloat = new Float("100");
+        Short myShort = new Short("100");
+        Double myDouble = new Double("100");
+        int myInteger = 100;
+        Long myLong = new Long("100");
+        HashMap<String, String> map = new HashMap<String, String>();
+        map.put("a", "a");
+        HashMap<Integer, String> map1 = new HashMap<Integer, String>();
+        map1.put(1, "a");
+        HashMap<Integer, Integer> map2 = new HashMap<Integer, Integer>();
+        map2.put(1, 1);
+
+        List<SinkRecord> records = new ArrayList<SinkRecord>();
+        SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42);
+        record.headers().addBoolean("CamelHeaderMyBoolean", true);
+        record.headers().addByte("CamelHeaderMyByte", myByte);
+        record.headers().addFloat("CamelHeaderMyFloat", myFloat);
+        record.headers().addShort("CamelHeaderMyShort", myShort);
+        record.headers().addDouble("CamelHeaderMyDouble", myDouble);
+        record.headers().addInt("CamelHeaderMyInteger", myInteger);
+        record.headers().addLong("CamelHeaderMyLong", myLong);
+        record.headers().addMap("CamelHeaderMyMap", map, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA));
+        record.headers().addMap("CamelHeaderMyMap1", map1, SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.STRING_SCHEMA));
+        record.headers().addMap("CamelHeaderMyMap2", map2, SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.INT64_SCHEMA));
+        records.add(record);
+        camelsinkTask.put(records);
+
+        ConsumerTemplate c = camelsinkTask.getCms().createConsumerTemplate();
+        Exchange exchange = c.receive("seda:test", 1000L);
+        assertEquals("camel", exchange.getMessage().getBody());
+        assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", Boolean.class));
+        assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", Byte.class));
+        assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", Float.class));
+        assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", Short.class));
+        assertEquals(myDouble, exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class));
+        assertEquals(myInteger, exchange.getIn().getHeader("CamelHeaderMyInteger"));
+        assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", Long.class));
+        assertEquals(map, exchange.getIn().getHeader("CamelHeaderMyMap", Map.class));
+        assertEquals(map1, exchange.getIn().getHeader("CamelHeaderMyMap1", Map.class));
+        assertEquals(map2, exchange.getIn().getHeader("CamelHeaderMyMap2", Map.class));
+        camelsinkTask.stop();
+    }
+    
+    @Test
+    public void testBodyAndPropertiesHeadersMapMixed() throws JsonProcessingException, InterruptedException {
+        Map<String, String> props = new HashMap<>();
+        props.put("camel.sink.url", "seda:test");
+        props.put("topics", "mytopic");
+
+        CamelSinkTask camelsinkTask = new CamelSinkTask();
+        camelsinkTask.start(props);
+
+        String topic = "mytopic";
+        Byte myByte = new Byte("100");
+        Float myFloat = new Float("100");
+        Short myShort = new Short("100");
+        Double myDouble = new Double("100");
+        int myInteger = 100;
+        Long myLong = new Long("100");
+        HashMap<String, String> map = new HashMap<String, String>();
+        map.put("a", "a");
+        HashMap<Integer, String> map1 = new HashMap<Integer, String>();
+        map1.put(1, "a");
+        HashMap<Integer, Integer> map2 = new HashMap<Integer, Integer>();
+        map2.put(1, 1);
+
+        List<SinkRecord> records = new ArrayList<SinkRecord>();
+        SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42);
+        record.headers().addBoolean("CamelPropertyMyBoolean", true);
+        record.headers().addByte("CamelPropertyMyByte", myByte);
+        record.headers().addFloat("CamelPropertyMyFloat", myFloat);
+        record.headers().addShort("CamelPropertyMyShort", myShort);
+        record.headers().addDouble("CamelPropertyMyDouble", myDouble);
+        record.headers().addInt("CamelPropertyMyInteger", myInteger);
+        record.headers().addLong("CamelPropertyMyLong", myLong);
+        record.headers().addBoolean("CamelHeaderMyBoolean", true);
+        record.headers().addByte("CamelHeaderMyByte", myByte);
+        record.headers().addFloat("CamelHeaderMyFloat", myFloat);
+        record.headers().addShort("CamelHeaderMyShort", myShort);
+        record.headers().addDouble("CamelHeaderMyDouble", myDouble);
+        record.headers().addInt("CamelHeaderMyInteger", myInteger);
+        record.headers().addLong("CamelHeaderMyLong", myLong);
+        record.headers().addMap("CamelHeaderMyMap", map, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA));
+        record.headers().addMap("CamelHeaderMyMap1", map1, SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.STRING_SCHEMA));
+        record.headers().addMap("CamelHeaderMyMap2", map2, SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.INT64_SCHEMA));
+        records.add(record);
+        camelsinkTask.put(records);
+
+        ConsumerTemplate c = camelsinkTask.getCms().createConsumerTemplate();
+        Exchange exchange = c.receive("seda:test", 1000L);
+        assertEquals("camel", exchange.getMessage().getBody());
+        assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertTrue((boolean) exchange.getProperties().get("CamelPropertyMyBoolean"));
+        assertEquals(myByte, (Byte) exchange.getProperties().get("CamelPropertyMyByte"));
+        assertEquals(myFloat, (Float) exchange.getProperties().get("CamelPropertyMyFloat"));
+        assertEquals(myShort, (Short) exchange.getProperties().get("CamelPropertyMyShort"));
+        assertEquals(myDouble, (Double) exchange.getProperties().get("CamelPropertyMyDouble"));
+        assertEquals(myInteger, exchange.getProperties().get("CamelPropertyMyInteger"));
+        assertEquals(myLong, (Long) exchange.getProperties().get("CamelPropertyMyLong"));
+        assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", Boolean.class));
+        assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", Byte.class));
+        assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", Float.class));
+        assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", Short.class));
+        assertEquals(myDouble, exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class));
+        assertEquals(myInteger, exchange.getIn().getHeader("CamelHeaderMyInteger"));
+        assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", Long.class));
+        assertEquals(map, exchange.getIn().getHeader("CamelHeaderMyMap", Map.class));
+        assertEquals(map1, exchange.getIn().getHeader("CamelHeaderMyMap1", Map.class));
+        assertEquals(map2, exchange.getIn().getHeader("CamelHeaderMyMap2", Map.class));
+
+        camelsinkTask.stop();
+    }
 }