You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/12/09 11:36:46 UTC
[camel-kafka-connector] 01/02: Support Map and List as Headers -
Map support
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch map-list-as-headers
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit d1d0a48503a662745d3526a16d8f49e164c329b7
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Dec 9 12:07:43 2019 +0100
Support Map and List as Headers - Map support
---
.../apache/camel/kafkaconnector/CamelSinkTask.java | 7 +-
.../camel/kafkaconnector/CamelSourceTask.java | 2 +
.../camel/kafkaconnector/CamelSinkTaskTest.java | 127 +++++++++++++++++++++
3 files changed, 135 insertions(+), 1 deletion(-)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 80c018f..39dc58e 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -25,6 +25,7 @@ import org.apache.camel.ProducerTemplate;
import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
import org.apache.camel.support.DefaultExchange;
import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.sink.SinkRecord;
@@ -122,7 +123,9 @@ public class CamelSinkTask extends SinkTask {
map.put(singleHeader.key(), (long)singleHeader.value());
} else if (schema.type().getName().equalsIgnoreCase(Schema.INT8_SCHEMA.type().getName())) {
map.put(singleHeader.key(), (byte)singleHeader.value());
- }
+ } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).type().getName())) {
+ map.put(singleHeader.key(), (Map)singleHeader.value());
+ }
}
private void addProperty(Exchange exchange, Header singleHeader) {
@@ -145,6 +148,8 @@ public class CamelSinkTask extends SinkTask {
exchange.getProperties().put(singleHeader.key(), (long)singleHeader.value());
} else if (schema.type().getName().equalsIgnoreCase(Schema.INT8_SCHEMA.type().getName())) {
exchange.getProperties().put(singleHeader.key(), (byte)singleHeader.value());
+ } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).type().getName())) {
+ exchange.getProperties().put(singleHeader.key(), (Map)singleHeader.value());
}
}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index e0147c0..28fe5cf 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -180,6 +180,8 @@ public class CamelSourceTask extends SourceTask {
record.headers().addTime(keyCamelHeader, (Time)value);
} else if (value instanceof Timestamp) {
record.headers().addTimestamp(keyCamelHeader, (Timestamp)value);
+ } else if (value instanceof Map) {
+ record.headers().addMap(keyCamelHeader, (Map)value, Schema.STRING_SCHEMA);
}
}
}
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 3ae60be..bd771c3 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -24,6 +24,8 @@ import java.util.Map;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.Exchange;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Test;
@@ -201,4 +203,129 @@ public class CamelSinkTaskTest {
camelsinkTask.stop();
}
+
+ @Test
+ public void testBodyAndHeadersMap() throws JsonProcessingException, InterruptedException {
+ Map<String, String> props = new HashMap<>();
+ props.put("camel.sink.url", "seda:test");
+ props.put("topics", "mytopic");
+
+ CamelSinkTask camelsinkTask = new CamelSinkTask();
+ camelsinkTask.start(props);
+
+ String topic = "mytopic";
+ Byte myByte = new Byte("100");
+ Float myFloat = new Float("100");
+ Short myShort = new Short("100");
+ Double myDouble = new Double("100");
+ int myInteger = 100;
+ Long myLong = new Long("100");
+ HashMap<String, String> map = new HashMap<String, String>();
+ map.put("a", "a");
+ HashMap<Integer, String> map1 = new HashMap<Integer, String>();
+ map1.put(1, "a");
+ HashMap<Integer, Integer> map2 = new HashMap<Integer, Integer>();
+ map2.put(1, 1);
+
+ List<SinkRecord> records = new ArrayList<SinkRecord>();
+ SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42);
+ record.headers().addBoolean("CamelHeaderMyBoolean", true);
+ record.headers().addByte("CamelHeaderMyByte", myByte);
+ record.headers().addFloat("CamelHeaderMyFloat", myFloat);
+ record.headers().addShort("CamelHeaderMyShort", myShort);
+ record.headers().addDouble("CamelHeaderMyDouble", myDouble);
+ record.headers().addInt("CamelHeaderMyInteger", myInteger);
+ record.headers().addLong("CamelHeaderMyLong", myLong);
+ record.headers().addMap("CamelHeaderMyMap", map, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA));
+ record.headers().addMap("CamelHeaderMyMap1", map1, SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.STRING_SCHEMA));
+ record.headers().addMap("CamelHeaderMyMap2", map2, SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.INT64_SCHEMA));
+ records.add(record);
+ camelsinkTask.put(records);
+
+ ConsumerTemplate c = camelsinkTask.getCms().createConsumerTemplate();
+ Exchange exchange = c.receive("seda:test", 1000L);
+ assertEquals("camel", exchange.getMessage().getBody());
+ assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+ assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", Boolean.class));
+ assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", Byte.class));
+ assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", Float.class));
+ assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", Short.class));
+ assertEquals(myDouble, exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class));
+ assertEquals(myInteger, exchange.getIn().getHeader("CamelHeaderMyInteger"));
+ assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", Long.class));
+ assertEquals(map, exchange.getIn().getHeader("CamelHeaderMyMap", Map.class));
+ assertEquals(map1, exchange.getIn().getHeader("CamelHeaderMyMap1", Map.class));
+ assertEquals(map2, exchange.getIn().getHeader("CamelHeaderMyMap2", Map.class));
+ camelsinkTask.stop();
+ }
+
+ @Test
+ public void testBodyAndPropertiesHeadersMapMixed() throws JsonProcessingException, InterruptedException {
+ Map<String, String> props = new HashMap<>();
+ props.put("camel.sink.url", "seda:test");
+ props.put("topics", "mytopic");
+
+ CamelSinkTask camelsinkTask = new CamelSinkTask();
+ camelsinkTask.start(props);
+
+ String topic = "mytopic";
+ Byte myByte = new Byte("100");
+ Float myFloat = new Float("100");
+ Short myShort = new Short("100");
+ Double myDouble = new Double("100");
+ int myInteger = 100;
+ Long myLong = new Long("100");
+ HashMap<String, String> map = new HashMap<String, String>();
+ map.put("a", "a");
+ HashMap<Integer, String> map1 = new HashMap<Integer, String>();
+ map1.put(1, "a");
+ HashMap<Integer, Integer> map2 = new HashMap<Integer, Integer>();
+ map2.put(1, 1);
+
+ List<SinkRecord> records = new ArrayList<SinkRecord>();
+ SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42);
+ record.headers().addBoolean("CamelPropertyMyBoolean", true);
+ record.headers().addByte("CamelPropertyMyByte", myByte);
+ record.headers().addFloat("CamelPropertyMyFloat", myFloat);
+ record.headers().addShort("CamelPropertyMyShort", myShort);
+ record.headers().addDouble("CamelPropertyMyDouble", myDouble);
+ record.headers().addInt("CamelPropertyMyInteger", myInteger);
+ record.headers().addLong("CamelPropertyMyLong", myLong);
+ record.headers().addBoolean("CamelHeaderMyBoolean", true);
+ record.headers().addByte("CamelHeaderMyByte", myByte);
+ record.headers().addFloat("CamelHeaderMyFloat", myFloat);
+ record.headers().addShort("CamelHeaderMyShort", myShort);
+ record.headers().addDouble("CamelHeaderMyDouble", myDouble);
+ record.headers().addInt("CamelHeaderMyInteger", myInteger);
+ record.headers().addLong("CamelHeaderMyLong", myLong);
+ record.headers().addMap("CamelHeaderMyMap", map, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA));
+ record.headers().addMap("CamelHeaderMyMap1", map1, SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.STRING_SCHEMA));
+ record.headers().addMap("CamelHeaderMyMap2", map2, SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.INT64_SCHEMA));
+ records.add(record);
+ camelsinkTask.put(records);
+
+ ConsumerTemplate c = camelsinkTask.getCms().createConsumerTemplate();
+ Exchange exchange = c.receive("seda:test", 1000L);
+ assertEquals("camel", exchange.getMessage().getBody());
+ assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+ assertTrue((boolean) exchange.getProperties().get("CamelPropertyMyBoolean"));
+ assertEquals(myByte, (Byte) exchange.getProperties().get("CamelPropertyMyByte"));
+ assertEquals(myFloat, (Float) exchange.getProperties().get("CamelPropertyMyFloat"));
+ assertEquals(myShort, (Short) exchange.getProperties().get("CamelPropertyMyShort"));
+ assertEquals(myDouble, (Double) exchange.getProperties().get("CamelPropertyMyDouble"));
+ assertEquals(myInteger, exchange.getProperties().get("CamelPropertyMyInteger"));
+ assertEquals(myLong, (Long) exchange.getProperties().get("CamelPropertyMyLong"));
+ assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", Boolean.class));
+ assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", Byte.class));
+ assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", Float.class));
+ assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", Short.class));
+ assertEquals(myDouble, exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class));
+ assertEquals(myInteger, exchange.getIn().getHeader("CamelHeaderMyInteger"));
+ assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", Long.class));
+ assertEquals(map, exchange.getIn().getHeader("CamelHeaderMyMap", Map.class));
+ assertEquals(map1, exchange.getIn().getHeader("CamelHeaderMyMap1", Map.class));
+ assertEquals(map2, exchange.getIn().getHeader("CamelHeaderMyMap2", Map.class));
+
+ camelsinkTask.stop();
+ }
}