You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by oa...@apache.org on 2021/02/23 09:35:42 UTC

[camel-kafka-connector] 01/01: Convert Struct to Map

This is an automated email from the ASF dual-hosted git repository.

oalsafi pushed a commit to branch add-struct-check
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 68dbfb8c1b42aec6ecdce82b0cb4195b359d5ea2
Author: Omar Al-Safi <om...@gmail.com>
AuthorDate: Tue Feb 23 10:35:27 2021 +0100

    Convert Struct to Map
---
 .../apache/camel/kafkaconnector/CamelSinkTask.java | 35 ++++++++--
 .../camel/kafkaconnector/CamelSinkTaskTest.java    | 77 ++++++++++++++++++----
 2 files changed, 97 insertions(+), 15 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 82c16d2..1e8fa43 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -18,6 +18,7 @@ package org.apache.camel.kafkaconnector;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
@@ -33,7 +34,9 @@ import org.apache.camel.support.DefaultExchange;
 import org.apache.camel.util.StringHelper;
 import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.sink.ErrantRecordReporter;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -110,7 +113,7 @@ public class CamelSinkTask extends SinkTask {
             final String headersRemovePattern = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF);
             mapProperties = config.getBoolean(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF);
             mapHeaders = config.getBoolean(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_HEADERS_CONF);
-            
+
             CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
                 remoteUrl = TaskHelper.buildUrl(camelContext,
@@ -175,8 +178,8 @@ public class CamelSinkTask extends SinkTask {
             TaskHelper.logRecordContent(LOG, loggingLevel, record);
 
             Exchange exchange = new DefaultExchange(producer.getCamelContext());
-            exchange.getMessage().setBody(record.value());
-            exchange.getMessage().setHeader(KAFKA_RECORD_KEY_HEADER, record.key());
+            exchange.getMessage().setBody(convertValueFromStruct(record.valueSchema(), record.value()));
+            exchange.getMessage().setHeader(KAFKA_RECORD_KEY_HEADER, convertValueFromStruct(record.keySchema(), record.key()));
 
             for (Header header : record.headers()) {
                 if (header.key().startsWith(HEADER_CAMEL_PREFIX)) {
@@ -232,8 +235,32 @@ public class CamelSinkTask extends SinkTask {
         if (schema.type().equals(Schema.BYTES_SCHEMA.type()) && Objects.equals(schema.name(), Decimal.LOGICAL_NAME)) {
             destination.put(key, Decimal.toLogical(schema, (byte[]) header.value()));
         } else {
-            destination.put(key, header.value());
+            destination.put(key, convertValueFromStruct(header.schema(), header.value()));
+        }
+    }
+
+    private static Object convertValueFromStruct(Schema schema, Object value) {
+        // if we have a schema of type Struct, we convert it to map, otherwise
+        // we just return the value as it
+        if (schema != null && value != null && Schema.Type.STRUCT == schema.type()) {
+            return toMap((Struct) value);
         }
+
+        return value;
+    }
+
+    private static Map<String, Object> toMap(final Struct struct) {
+        final HashMap<String, Object> fieldsToValues = new HashMap<>();
+
+        struct.schema().fields().forEach(field -> {
+            try {
+                fieldsToValues.put(field.name(), struct.get(field));
+            } catch (DataException e) {
+                fieldsToValues.put(field.name(), null);
+            }
+        });
+
+        return fieldsToValues;
     }
 
     CamelKafkaConnectMain getCms() {
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index bab0a5d..0316b2f 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.junit.jupiter.api.Test;
@@ -74,6 +75,52 @@ public class CamelSinkTaskTest {
     }
 
     @Test
+    public void testStructBody() {
+        Map<String, String> props = new HashMap<>();
+        props.put(TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
+
+        List<SinkRecord> records = new ArrayList<SinkRecord>();
+        Schema keySchema = SchemaBuilder.struct()
+                .name("keySchema")
+                .field("id", Schema.INT32_SCHEMA)
+                .build();
+
+        Schema valueSchema = SchemaBuilder.struct()
+                .name("valueSchema")
+                .field("id", SchemaBuilder.INT32_SCHEMA)
+                .field("name", SchemaBuilder.STRING_SCHEMA)
+                .field("isAdult", SchemaBuilder.BOOLEAN_SCHEMA)
+                .build();
+
+        Struct key = new Struct(keySchema).put("id", 12);
+        Struct value = new Struct(valueSchema)
+                .put("id", 12)
+                .put("name", "jane doe")
+                .put("isAdult", true);
+
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, keySchema, key, valueSchema, value, 42);
+        records.add(record);
+        sinkTask.put(records);
+
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+
+        assertEquals("jane doe", exchange.getMessage().getBody(Map.class).get("name"));
+        assertEquals(12, exchange.getMessage().getBody(Map.class).get("id"));
+        assertTrue((Boolean) exchange.getMessage().getBody(Map.class).get("isAdult"));
+
+        assertEquals(12, ((Map) exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)).get("id"));
+        assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
+                .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
+
+        sinkTask.stop();
+    }
+
+    @Test
     public void testTopicsRegex() {
         Map<String, String> props = new HashMap<>();
         props.put("topics.regex", "topic1*");
@@ -120,6 +167,12 @@ public class CamelSinkTaskTest {
         BigDecimal myBigDecimal = new BigDecimal(1234567890);
         Schema schema = Decimal.schema(myBigDecimal.scale());
 
+        Schema headerStruct = SchemaBuilder.struct()
+                .field("myHeader", Schema.STRING_SCHEMA)
+                .build();
+
+        Struct headerStructValue = new Struct(headerStruct).put("myHeader", "structHeader");
+
         List<SinkRecord> records = new ArrayList<SinkRecord>();
         SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
         record.headers().addBoolean(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyBoolean", true);
@@ -130,6 +183,7 @@ public class CamelSinkTaskTest {
         record.headers().addInt(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyInteger", myInteger);
         record.headers().addLong(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyLong", myLong);
         record.headers().add(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyBigDecimal", Decimal.fromLogical(schema, myBigDecimal), schema);
+        record.headers().addStruct(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyStruct", headerStructValue);
         records.add(record);
         sinkTask.put(records);
 
@@ -145,10 +199,11 @@ public class CamelSinkTaskTest {
         assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
         assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
         assertEquals(myBigDecimal, exchange.getIn().getHeader("MyBigDecimal", BigDecimal.class));
+        assertEquals("structHeader", exchange.getIn().getHeader("MyStruct", Map.class).get("myHeader"));
 
         sinkTask.stop();
     }
-    
+
     @Test
     public void testBodyAndHeadersExclusions() {
         Map<String, String> props = new HashMap<>();
@@ -196,7 +251,7 @@ public class CamelSinkTaskTest {
 
         sinkTask.stop();
     }
-    
+
     @Test
     public void testBodyAndHeadersExclusionsRegex() {
         Map<String, String> props = new HashMap<>();
@@ -671,7 +726,7 @@ public class CamelSinkTaskTest {
 
         sinkTask.stop();
     }
-    
+
     @Test
     public void testBodyAndPropertiesHeadersMixedWithoutPropertiesAndHeadersMapping() {
         Map<String, String> props = new HashMap<>();
@@ -730,7 +785,7 @@ public class CamelSinkTaskTest {
 
         sinkTask.stop();
     }
-    
+
     @Test
     public void testBodyAndPropertiesHeadersMixedWithoutPropertiesMapping() {
         Map<String, String> props = new HashMap<>();
@@ -879,7 +934,7 @@ public class CamelSinkTaskTest {
 
         sinkTask.stop();
     }
-    
+
     @Test
     public void testAggregationWithIdempotencyBodyAndTimeout() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
@@ -933,7 +988,7 @@ public class CamelSinkTaskTest {
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
             .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-        
+
         exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel5 camel6 camel7 camel8 camel9", exchange.getMessage().getBody());
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
@@ -942,7 +997,7 @@ public class CamelSinkTaskTest {
 
         sinkTask.stop();
     }
-    
+
     @Test
     public void testWithIdempotency() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
@@ -993,13 +1048,13 @@ public class CamelSinkTaskTest {
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
             .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-        
+
         exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel1", exchange.getMessage().getBody());
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
             .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-        
+
         exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel2", exchange.getMessage().getBody());
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
@@ -1008,7 +1063,7 @@ public class CamelSinkTaskTest {
 
         sinkTask.stop();
     }
-    
+
     @Test
     public void testWithIdempotencyAndHeader() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
@@ -1040,7 +1095,7 @@ public class CamelSinkTaskTest {
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
             .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-        
+
         exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel1", exchange.getMessage().getBody());
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));