You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/12/09 11:36:47 UTC
[camel-kafka-connector] 02/02: Support Map and List as Headers -
List support
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch map-list-as-headers
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 5d007667f5f1c363e5f8ad9c55eb10bf655fbdb1
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Dec 9 12:36:13 2019 +0100
Support Map and List as Headers - List support
---
.../apache/camel/kafkaconnector/CamelSinkTask.java | 7 +-
.../camel/kafkaconnector/CamelSourceTask.java | 3 +
.../camel/kafkaconnector/CamelSinkTaskTest.java | 127 +++++++++++++++++++++
3 files changed, 136 insertions(+), 1 deletion(-)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 39dc58e..02832ef 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -19,6 +19,7 @@ package org.apache.camel.kafkaconnector;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
@@ -125,6 +126,8 @@ public class CamelSinkTask extends SinkTask {
map.put(singleHeader.key(), (byte)singleHeader.value());
} else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).type().getName())) {
map.put(singleHeader.key(), (Map)singleHeader.value());
+ } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName())) {
+ map.put(singleHeader.key(), (List)singleHeader.value());
}
}
@@ -150,7 +153,9 @@ public class CamelSinkTask extends SinkTask {
exchange.getProperties().put(singleHeader.key(), (byte)singleHeader.value());
} else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).type().getName())) {
exchange.getProperties().put(singleHeader.key(), (Map)singleHeader.value());
- }
+ } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName())) {
+ exchange.getProperties().put(singleHeader.key(), (List)singleHeader.value());
+ }
}
public CamelMainSupport getCms() {
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 28fe5cf..35d176d 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -31,6 +31,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.PollingConsumer;
import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
@@ -182,6 +183,8 @@ public class CamelSourceTask extends SourceTask {
record.headers().addTimestamp(keyCamelHeader, (Timestamp)value);
} else if (value instanceof Map) {
record.headers().addMap(keyCamelHeader, (Map)value, Schema.STRING_SCHEMA);
+ } else if (value instanceof List) {
+ record.headers().addList(keyCamelHeader, (List)value, SchemaBuilder.array(Schema.STRING_SCHEMA));
}
}
}
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index bd771c3..4c529e3 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -291,6 +291,9 @@ public class CamelSinkTaskTest {
record.headers().addDouble("CamelPropertyMyDouble", myDouble);
record.headers().addInt("CamelPropertyMyInteger", myInteger);
record.headers().addLong("CamelPropertyMyLong", myLong);
+ record.headers().addMap("CamelPropertyMyMap", map, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA));
+ record.headers().addMap("CamelPropertyMyMap1", map1, SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.STRING_SCHEMA));
+ record.headers().addMap("CamelPropertyMyMap2", map2, SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.INT64_SCHEMA));
record.headers().addBoolean("CamelHeaderMyBoolean", true);
record.headers().addByte("CamelHeaderMyByte", myByte);
record.headers().addFloat("CamelHeaderMyFloat", myFloat);
@@ -315,6 +318,9 @@ public class CamelSinkTaskTest {
assertEquals(myDouble, (Double) exchange.getProperties().get("CamelPropertyMyDouble"));
assertEquals(myInteger, exchange.getProperties().get("CamelPropertyMyInteger"));
assertEquals(myLong, (Long) exchange.getProperties().get("CamelPropertyMyLong"));
+ assertEquals(map, exchange.getProperties().get("CamelPropertyMyMap"));
+ assertEquals(map1, exchange.getProperties().get("CamelPropertyMyMap1"));
+ assertEquals(map2, exchange.getProperties().get("CamelPropertyMyMap2"));
assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", Boolean.class));
assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", Byte.class));
assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", Float.class));
@@ -328,4 +334,125 @@ public class CamelSinkTaskTest {
camelsinkTask.stop();
}
+
+ @Test
+ public void testBodyAndHeadersList() throws JsonProcessingException, InterruptedException {
+ Map<String, String> props = new HashMap<>();
+ props.put("camel.sink.url", "seda:test");
+ props.put("topics", "mytopic");
+
+ CamelSinkTask camelsinkTask = new CamelSinkTask();
+ camelsinkTask.start(props);
+
+ String topic = "mytopic";
+ Byte myByte = new Byte("100");
+ Float myFloat = new Float("100");
+ Short myShort = new Short("100");
+ Double myDouble = new Double("100");
+ int myInteger = 100;
+ Long myLong = new Long("100");
+ List<String> list = new ArrayList<String>();
+ list.add("a");
+ List<Integer> list1 = new ArrayList<Integer>();
+ list1.add(1);
+
+ List<SinkRecord> records = new ArrayList<SinkRecord>();
+ SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42);
+ record.headers().addBoolean("CamelHeaderMyBoolean", true);
+ record.headers().addByte("CamelHeaderMyByte", myByte);
+ record.headers().addFloat("CamelHeaderMyFloat", myFloat);
+ record.headers().addShort("CamelHeaderMyShort", myShort);
+ record.headers().addDouble("CamelHeaderMyDouble", myDouble);
+ record.headers().addInt("CamelHeaderMyInteger", myInteger);
+ record.headers().addLong("CamelHeaderMyLong", myLong);
+ record.headers().addList("CamelHeaderMyList", list, SchemaBuilder.array(Schema.STRING_SCHEMA));
+ record.headers().addList("CamelHeaderMyList1", list1, SchemaBuilder.array(Schema.INT64_SCHEMA));
+ records.add(record);
+ camelsinkTask.put(records);
+
+ ConsumerTemplate c = camelsinkTask.getCms().createConsumerTemplate();
+ Exchange exchange = c.receive("seda:test", 1000L);
+ assertEquals("camel", exchange.getMessage().getBody());
+ assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+ assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", Boolean.class));
+ assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", Byte.class));
+ assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", Float.class));
+ assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", Short.class));
+ assertEquals(myDouble, exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class));
+ assertEquals(myInteger, exchange.getIn().getHeader("CamelHeaderMyInteger"));
+ assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", Long.class));
+ assertEquals(list, exchange.getIn().getHeader("CamelHeaderMyList", List.class));
+ assertEquals(list1, exchange.getIn().getHeader("CamelHeaderMyList1", List.class));
+ camelsinkTask.stop();
+ }
+
+ @Test
+ public void testBodyAndPropertiesHeadersListMixed() throws JsonProcessingException, InterruptedException {
+ Map<String, String> props = new HashMap<>();
+ props.put("camel.sink.url", "seda:test");
+ props.put("topics", "mytopic");
+
+ CamelSinkTask camelsinkTask = new CamelSinkTask();
+ camelsinkTask.start(props);
+
+ String topic = "mytopic";
+ Byte myByte = new Byte("100");
+ Float myFloat = new Float("100");
+ Short myShort = new Short("100");
+ Double myDouble = new Double("100");
+ int myInteger = 100;
+ Long myLong = new Long("100");
+ List<String> list = new ArrayList<String>();
+ list.add("a");
+ List<Integer> list1 = new ArrayList<Integer>();
+ list1.add(1);
+
+ List<SinkRecord> records = new ArrayList<SinkRecord>();
+ SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42);
+ record.headers().addBoolean("CamelPropertyMyBoolean", true);
+ record.headers().addByte("CamelPropertyMyByte", myByte);
+ record.headers().addFloat("CamelPropertyMyFloat", myFloat);
+ record.headers().addShort("CamelPropertyMyShort", myShort);
+ record.headers().addDouble("CamelPropertyMyDouble", myDouble);
+ record.headers().addInt("CamelPropertyMyInteger", myInteger);
+ record.headers().addLong("CamelPropertyMyLong", myLong);
+ record.headers().addBoolean("CamelHeaderMyBoolean", true);
+ record.headers().addByte("CamelHeaderMyByte", myByte);
+ record.headers().addFloat("CamelHeaderMyFloat", myFloat);
+ record.headers().addShort("CamelHeaderMyShort", myShort);
+ record.headers().addDouble("CamelHeaderMyDouble", myDouble);
+ record.headers().addInt("CamelHeaderMyInteger", myInteger);
+ record.headers().addLong("CamelHeaderMyLong", myLong);
+ record.headers().addList("CamelHeaderMyList", list, SchemaBuilder.array(Schema.STRING_SCHEMA));
+ record.headers().addList("CamelHeaderMyList1", list1, SchemaBuilder.array(Schema.INT64_SCHEMA));
+ record.headers().addList("CamelPropertyMyList", list, SchemaBuilder.array(Schema.STRING_SCHEMA));
+ record.headers().addList("CamelPropertyMyList1", list1, SchemaBuilder.array(Schema.INT64_SCHEMA));
+ records.add(record);
+ camelsinkTask.put(records);
+
+ ConsumerTemplate c = camelsinkTask.getCms().createConsumerTemplate();
+ Exchange exchange = c.receive("seda:test", 1000L);
+ assertEquals("camel", exchange.getMessage().getBody());
+ assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+ assertTrue((boolean) exchange.getProperties().get("CamelPropertyMyBoolean"));
+ assertEquals(myByte, (Byte) exchange.getProperties().get("CamelPropertyMyByte"));
+ assertEquals(myFloat, (Float) exchange.getProperties().get("CamelPropertyMyFloat"));
+ assertEquals(myShort, (Short) exchange.getProperties().get("CamelPropertyMyShort"));
+ assertEquals(myDouble, (Double) exchange.getProperties().get("CamelPropertyMyDouble"));
+ assertEquals(myInteger, exchange.getProperties().get("CamelPropertyMyInteger"));
+ assertEquals(myLong, (Long) exchange.getProperties().get("CamelPropertyMyLong"));
+ assertEquals(list, exchange.getProperties().get("CamelPropertyMyList"));
+ assertEquals(list1, exchange.getProperties().get("CamelPropertyMyList1"));
+ assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", Boolean.class));
+ assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", Byte.class));
+ assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", Float.class));
+ assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", Short.class));
+ assertEquals(myDouble, exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class));
+ assertEquals(myInteger, exchange.getIn().getHeader("CamelHeaderMyInteger"));
+ assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", Long.class));
+ assertEquals(list, exchange.getIn().getHeader("CamelHeaderMyList", List.class));
+ assertEquals(list1, exchange.getIn().getHeader("CamelHeaderMyList1", List.class));
+
+ camelsinkTask.stop();
+ }
}