You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/10/08 21:18:46 UTC

[camel-kafka-connector] branch master updated: core: simplify kafka headers to camel exchange mapping

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b315a3  core: simplify kafka headers to camel exchange mapping
3b315a3 is described below

commit 3b315a3e4c4e2270460376a422fb094c9f1fde40
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Thu Oct 8 19:27:03 2020 +0200

    core: simplify kafka headers to camel exchange mapping
---
 .../apache/camel/kafkaconnector/CamelSinkTask.java | 86 ++++------------------
 1 file changed, 13 insertions(+), 73 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 63e139c..25d94f1 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -18,9 +18,6 @@ package org.apache.camel.kafkaconnector;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
@@ -36,8 +33,6 @@ import org.apache.camel.support.DefaultExchange;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.data.SchemaBuilder;
-import org.apache.kafka.connect.data.Timestamp;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -130,18 +125,18 @@ public class CamelSinkTask extends SinkTask {
     public void put(Collection<SinkRecord> sinkRecords) {
         for (SinkRecord record : sinkRecords) {
             TaskHelper.logRecordContent(LOG, record, config);
-            Map<String, Object> headers = new HashMap<>();
+
             Exchange exchange = new DefaultExchange(producer.getCamelContext());
-            headers.put(KAFKA_RECORD_KEY_HEADER, record.key());
+            exchange.getMessage().setBody(record.value());
+            exchange.getMessage().setHeader(KAFKA_RECORD_KEY_HEADER, record.key());
+
             for (Header header : record.headers()) {
                 if (header.key().startsWith(HEADER_CAMEL_PREFIX)) {
-                    addHeader(headers, header);
+                    mapHeader(header, HEADER_CAMEL_PREFIX, exchange.getMessage().getHeaders());
                 } else if (header.key().startsWith(PROPERTY_CAMEL_PREFIX)) {
-                    addProperty(exchange, header);
+                    mapHeader(header, PROPERTY_CAMEL_PREFIX, exchange.getProperties());
                 }
             }
-            exchange.getMessage().setHeaders(headers);
-            exchange.getMessage().setBody(record.value());
 
             LOG.debug("Sending exchange {} to {}", exchange.getExchangeId(), LOCAL_URL);
             producer.send(localEndpoint, exchange);
@@ -172,69 +167,14 @@ public class CamelSinkTask extends SinkTask {
         }
     }
 
-    private void addHeader(Map<String, Object> map, Header singleHeader) {
-        String camelHeaderKey = StringUtils.removeStart(singleHeader.key(), HEADER_CAMEL_PREFIX);
-        Schema schema = singleHeader.schema();
-
-        if (schema.type().equals(Timestamp.SCHEMA.type()) && Objects.equals(schema.name(), Timestamp.SCHEMA.name())) {
-            map.put(camelHeaderKey, (Date)singleHeader.value());
-        } else if (schema.type().getName().equals(Schema.STRING_SCHEMA.type().getName())) {
-            map.put(camelHeaderKey, (String)singleHeader.value());
-        } else if (schema.type().getName().equalsIgnoreCase(Schema.BOOLEAN_SCHEMA.type().getName())) {
-            map.put(camelHeaderKey, (Boolean)singleHeader.value());
-        } else if (schema.type().getName().equalsIgnoreCase(Schema.INT32_SCHEMA.type().getName())) {
-            map.put(camelHeaderKey, singleHeader.value());
-        } else if (schema.type().getName().equalsIgnoreCase(Schema.BYTES_SCHEMA.type().getName())) {
-            if (Decimal.class.getCanonicalName().equals(schema.name())) {
-                map.put(camelHeaderKey, Decimal.toLogical(schema, (byte[])singleHeader.value()));
-            } else {
-                map.put(camelHeaderKey, (byte[])singleHeader.value());
-            }
-        } else if (schema.type().getName().equalsIgnoreCase(Schema.FLOAT32_SCHEMA.type().getName())) {
-            map.put(camelHeaderKey, (float)singleHeader.value());
-        } else if (schema.type().getName().equalsIgnoreCase(Schema.FLOAT64_SCHEMA.type().getName())) {
-            map.put(camelHeaderKey, (double)singleHeader.value());
-        } else if (schema.type().getName().equalsIgnoreCase(Schema.INT16_SCHEMA.type().getName())) {
-            map.put(camelHeaderKey, (short)singleHeader.value());
-        } else if (schema.type().getName().equalsIgnoreCase(Schema.INT64_SCHEMA.type().getName())) {
-            map.put(camelHeaderKey, (long)singleHeader.value());
-        } else if (schema.type().getName().equalsIgnoreCase(Schema.INT8_SCHEMA.type().getName())) {
-            map.put(camelHeaderKey, (byte)singleHeader.value());
-        } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).type().getName())) {
-            map.put(camelHeaderKey, (Map<?, ?>)singleHeader.value());
-        } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName())) {
-            map.put(camelHeaderKey, (List<?>)singleHeader.value());
-        }
-    }
+    private static void mapHeader(Header header, String prefix, Map<String, Object> destination) {
+        final String key = StringUtils.removeStart(header.key(), prefix);
+        final Schema schema = header.schema();
 
-    private void addProperty(Exchange exchange, Header singleHeader) {
-        String camelPropertyKey = StringUtils.removeStart(singleHeader.key(), PROPERTY_CAMEL_PREFIX);
-        Schema schema = singleHeader.schema();
-
-        if (schema.type().equals(Timestamp.SCHEMA.type()) && Objects.equals(schema.name(), Timestamp.SCHEMA.name())) {
-            exchange.getProperties().put(camelPropertyKey, (Date)singleHeader.value());
-        } else if (schema.type().getName().equals(Schema.STRING_SCHEMA.type().getName())) {
-            exchange.getProperties().put(camelPropertyKey, (String)singleHeader.value());
-        } else if (schema.type().getName().equalsIgnoreCase(Schema.BOOLEAN_SCHEMA.type().getName())) {
-            exchange.getProperties().put(camelPropertyKey, (Boolean)singleHeader.value());
-        } else if (schema.type().getName().equalsIgnoreCase(Schema.INT32_SCHEMA.type().getName())) {
-            exchange.getProperties().put(camelPropertyKey, singleHeader.value());
-        } else if (schema.type().getName().equalsIgnoreCase(Schema.BYTES_SCHEMA.type().getName())) {
-            exchange.getProperties().put(camelPropertyKey, (byte[])singleHeader.value());
-        } else if (schema.type().getName().equalsIgnoreCase(Schema.FLOAT32_SCHEMA.type().getName())) {
-            exchange.getProperties().put(camelPropertyKey, (float)singleHeader.value());
-        } else if (schema.type().getName().equalsIgnoreCase(Schema.FLOAT64_SCHEMA.type().getName())) {
-            exchange.getProperties().put(camelPropertyKey, (double)singleHeader.value());
-        } else if (schema.type().getName().equalsIgnoreCase(Schema.INT16_SCHEMA.type().getName())) {
-            exchange.getProperties().put(camelPropertyKey, (short)singleHeader.value());
-        } else if (schema.type().getName().equalsIgnoreCase(Schema.INT64_SCHEMA.type().getName())) {
-            exchange.getProperties().put(camelPropertyKey, (long)singleHeader.value());
-        } else if (schema.type().getName().equalsIgnoreCase(Schema.INT8_SCHEMA.type().getName())) {
-            exchange.getProperties().put(camelPropertyKey, (byte)singleHeader.value());
-        } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).type().getName())) {
-            exchange.getProperties().put(camelPropertyKey, (Map<?, ?>)singleHeader.value());
-        } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName())) {
-            exchange.getProperties().put(camelPropertyKey, (List<?>)singleHeader.value());
+        if (schema.type().equals(Schema.BYTES_SCHEMA.type()) && Objects.equals(schema.name(), Decimal.LOGICAL_NAME)) {
+            destination.put(key, Decimal.toLogical(schema, (byte[]) header.value()));
+        } else {
+            destination.put(key, header.value());
         }
     }