You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by oa...@apache.org on 2021/02/23 09:35:42 UTC
[camel-kafka-connector] 01/01: Convert Struct to Map
This is an automated email from the ASF dual-hosted git repository.
oalsafi pushed a commit to branch add-struct-check
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 68dbfb8c1b42aec6ecdce82b0cb4195b359d5ea2
Author: Omar Al-Safi <om...@gmail.com>
AuthorDate: Tue Feb 23 10:35:27 2021 +0100
Convert Struct to Map
---
.../apache/camel/kafkaconnector/CamelSinkTask.java | 35 ++++++++--
.../camel/kafkaconnector/CamelSinkTaskTest.java | 77 ++++++++++++++++++----
2 files changed, 97 insertions(+), 15 deletions(-)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 82c16d2..1e8fa43 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -18,6 +18,7 @@ package org.apache.camel.kafkaconnector;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@@ -33,7 +34,9 @@ import org.apache.camel.support.DefaultExchange;
import org.apache.camel.util.StringHelper;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
@@ -110,7 +113,7 @@ public class CamelSinkTask extends SinkTask {
final String headersRemovePattern = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF);
mapProperties = config.getBoolean(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF);
mapHeaders = config.getBoolean(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_HEADERS_CONF);
-
+
CamelContext camelContext = new DefaultCamelContext();
if (remoteUrl == null) {
remoteUrl = TaskHelper.buildUrl(camelContext,
@@ -175,8 +178,8 @@ public class CamelSinkTask extends SinkTask {
TaskHelper.logRecordContent(LOG, loggingLevel, record);
Exchange exchange = new DefaultExchange(producer.getCamelContext());
- exchange.getMessage().setBody(record.value());
- exchange.getMessage().setHeader(KAFKA_RECORD_KEY_HEADER, record.key());
+ exchange.getMessage().setBody(convertValueFromStruct(record.valueSchema(), record.value()));
+ exchange.getMessage().setHeader(KAFKA_RECORD_KEY_HEADER, convertValueFromStruct(record.keySchema(), record.key()));
for (Header header : record.headers()) {
if (header.key().startsWith(HEADER_CAMEL_PREFIX)) {
@@ -232,8 +235,32 @@ public class CamelSinkTask extends SinkTask {
if (schema.type().equals(Schema.BYTES_SCHEMA.type()) && Objects.equals(schema.name(), Decimal.LOGICAL_NAME)) {
destination.put(key, Decimal.toLogical(schema, (byte[]) header.value()));
} else {
- destination.put(key, header.value());
+ destination.put(key, convertValueFromStruct(header.schema(), header.value()));
+ }
+ }
+
+ private static Object convertValueFromStruct(Schema schema, Object value) {
+ // if we have a schema of type Struct, we convert it to map, otherwise
+ // we just return the value as it
+ if (schema != null && value != null && Schema.Type.STRUCT == schema.type()) {
+ return toMap((Struct) value);
}
+
+ return value;
+ }
+
+ private static Map<String, Object> toMap(final Struct struct) {
+ final HashMap<String, Object> fieldsToValues = new HashMap<>();
+
+ struct.schema().fields().forEach(field -> {
+ try {
+ fieldsToValues.put(field.name(), struct.get(field));
+ } catch (DataException e) {
+ fieldsToValues.put(field.name(), null);
+ }
+ });
+
+ return fieldsToValues;
}
CamelKafkaConnectMain getCms() {
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index bab0a5d..0316b2f 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.Test;
@@ -74,6 +75,52 @@ public class CamelSinkTaskTest {
}
@Test
+ public void testStructBody() {
+ Map<String, String> props = new HashMap<>();
+ props.put(TOPIC_CONF, TOPIC_NAME);
+ props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+
+ CamelSinkTask sinkTask = new CamelSinkTask();
+ sinkTask.start(props);
+
+ List<SinkRecord> records = new ArrayList<SinkRecord>();
+ Schema keySchema = SchemaBuilder.struct()
+ .name("keySchema")
+ .field("id", Schema.INT32_SCHEMA)
+ .build();
+
+ Schema valueSchema = SchemaBuilder.struct()
+ .name("valueSchema")
+ .field("id", SchemaBuilder.INT32_SCHEMA)
+ .field("name", SchemaBuilder.STRING_SCHEMA)
+ .field("isAdult", SchemaBuilder.BOOLEAN_SCHEMA)
+ .build();
+
+ Struct key = new Struct(keySchema).put("id", 12);
+ Struct value = new Struct(valueSchema)
+ .put("id", 12)
+ .put("name", "jane doe")
+ .put("isAdult", true);
+
+ SinkRecord record = new SinkRecord(TOPIC_NAME, 1, keySchema, key, valueSchema, value, 42);
+ records.add(record);
+ sinkTask.put(records);
+
+ ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
+ Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+
+ assertEquals("jane doe", exchange.getMessage().getBody(Map.class).get("name"));
+ assertEquals(12, exchange.getMessage().getBody(Map.class).get("id"));
+ assertTrue((Boolean) exchange.getMessage().getBody(Map.class).get("isAdult"));
+
+ assertEquals(12, ((Map) exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)).get("id"));
+ assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
+ .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
+
+ sinkTask.stop();
+ }
+
+ @Test
public void testTopicsRegex() {
Map<String, String> props = new HashMap<>();
props.put("topics.regex", "topic1*");
@@ -120,6 +167,12 @@ public class CamelSinkTaskTest {
BigDecimal myBigDecimal = new BigDecimal(1234567890);
Schema schema = Decimal.schema(myBigDecimal.scale());
+ Schema headerStruct = SchemaBuilder.struct()
+ .field("myHeader", Schema.STRING_SCHEMA)
+ .build();
+
+ Struct headerStructValue = new Struct(headerStruct).put("myHeader", "structHeader");
+
List<SinkRecord> records = new ArrayList<SinkRecord>();
SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
record.headers().addBoolean(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyBoolean", true);
@@ -130,6 +183,7 @@ public class CamelSinkTaskTest {
record.headers().addInt(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyInteger", myInteger);
record.headers().addLong(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyLong", myLong);
record.headers().add(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyBigDecimal", Decimal.fromLogical(schema, myBigDecimal), schema);
+ record.headers().addStruct(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyStruct", headerStructValue);
records.add(record);
sinkTask.put(records);
@@ -145,10 +199,11 @@ public class CamelSinkTaskTest {
assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
assertEquals(myBigDecimal, exchange.getIn().getHeader("MyBigDecimal", BigDecimal.class));
+ assertEquals("structHeader", exchange.getIn().getHeader("MyStruct", Map.class).get("myHeader"));
sinkTask.stop();
}
-
+
@Test
public void testBodyAndHeadersExclusions() {
Map<String, String> props = new HashMap<>();
@@ -196,7 +251,7 @@ public class CamelSinkTaskTest {
sinkTask.stop();
}
-
+
@Test
public void testBodyAndHeadersExclusionsRegex() {
Map<String, String> props = new HashMap<>();
@@ -671,7 +726,7 @@ public class CamelSinkTaskTest {
sinkTask.stop();
}
-
+
@Test
public void testBodyAndPropertiesHeadersMixedWithoutPropertiesAndHeadersMapping() {
Map<String, String> props = new HashMap<>();
@@ -730,7 +785,7 @@ public class CamelSinkTaskTest {
sinkTask.stop();
}
-
+
@Test
public void testBodyAndPropertiesHeadersMixedWithoutPropertiesMapping() {
Map<String, String> props = new HashMap<>();
@@ -879,7 +934,7 @@ public class CamelSinkTaskTest {
sinkTask.stop();
}
-
+
@Test
public void testAggregationWithIdempotencyBodyAndTimeout() throws InterruptedException {
Map<String, String> props = new HashMap<>();
@@ -933,7 +988,7 @@ public class CamelSinkTaskTest {
assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-
+
exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
assertEquals("camel5 camel6 camel7 camel8 camel9", exchange.getMessage().getBody());
assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
@@ -942,7 +997,7 @@ public class CamelSinkTaskTest {
sinkTask.stop();
}
-
+
@Test
public void testWithIdempotency() throws InterruptedException {
Map<String, String> props = new HashMap<>();
@@ -993,13 +1048,13 @@ public class CamelSinkTaskTest {
assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-
+
exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
assertEquals("camel1", exchange.getMessage().getBody());
assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-
+
exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
assertEquals("camel2", exchange.getMessage().getBody());
assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
@@ -1008,7 +1063,7 @@ public class CamelSinkTaskTest {
sinkTask.stop();
}
-
+
@Test
public void testWithIdempotencyAndHeader() throws InterruptedException {
Map<String, String> props = new HashMap<>();
@@ -1040,7 +1095,7 @@ public class CamelSinkTaskTest {
assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-
+
exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
assertEquals("camel1", exchange.getMessage().getBody());
assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));