You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/10/08 21:18:46 UTC
[camel-kafka-connector] branch master updated: core: simplify kafka
headers to camel exchange mapping
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 3b315a3 core: simplify kafka headers to camel exchange mapping
3b315a3 is described below
commit 3b315a3e4c4e2270460376a422fb094c9f1fde40
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Thu Oct 8 19:27:03 2020 +0200
core: simplify kafka headers to camel exchange mapping
---
.../apache/camel/kafkaconnector/CamelSinkTask.java | 86 ++++------------------
1 file changed, 13 insertions(+), 73 deletions(-)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 63e139c..25d94f1 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -18,9 +18,6 @@ package org.apache.camel.kafkaconnector;
import java.util.Collection;
import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -36,8 +33,6 @@ import org.apache.camel.support.DefaultExchange;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.data.SchemaBuilder;
-import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.sink.SinkRecord;
@@ -130,18 +125,18 @@ public class CamelSinkTask extends SinkTask {
public void put(Collection<SinkRecord> sinkRecords) {
for (SinkRecord record : sinkRecords) {
TaskHelper.logRecordContent(LOG, record, config);
- Map<String, Object> headers = new HashMap<>();
+
Exchange exchange = new DefaultExchange(producer.getCamelContext());
- headers.put(KAFKA_RECORD_KEY_HEADER, record.key());
+ exchange.getMessage().setBody(record.value());
+ exchange.getMessage().setHeader(KAFKA_RECORD_KEY_HEADER, record.key());
+
for (Header header : record.headers()) {
if (header.key().startsWith(HEADER_CAMEL_PREFIX)) {
- addHeader(headers, header);
+ mapHeader(header, HEADER_CAMEL_PREFIX, exchange.getMessage().getHeaders());
} else if (header.key().startsWith(PROPERTY_CAMEL_PREFIX)) {
- addProperty(exchange, header);
+ mapHeader(header, PROPERTY_CAMEL_PREFIX, exchange.getProperties());
}
}
- exchange.getMessage().setHeaders(headers);
- exchange.getMessage().setBody(record.value());
LOG.debug("Sending exchange {} to {}", exchange.getExchangeId(), LOCAL_URL);
producer.send(localEndpoint, exchange);
@@ -172,69 +167,14 @@ public class CamelSinkTask extends SinkTask {
}
}
- private void addHeader(Map<String, Object> map, Header singleHeader) {
- String camelHeaderKey = StringUtils.removeStart(singleHeader.key(), HEADER_CAMEL_PREFIX);
- Schema schema = singleHeader.schema();
-
- if (schema.type().equals(Timestamp.SCHEMA.type()) && Objects.equals(schema.name(), Timestamp.SCHEMA.name())) {
- map.put(camelHeaderKey, (Date)singleHeader.value());
- } else if (schema.type().getName().equals(Schema.STRING_SCHEMA.type().getName())) {
- map.put(camelHeaderKey, (String)singleHeader.value());
- } else if (schema.type().getName().equalsIgnoreCase(Schema.BOOLEAN_SCHEMA.type().getName())) {
- map.put(camelHeaderKey, (Boolean)singleHeader.value());
- } else if (schema.type().getName().equalsIgnoreCase(Schema.INT32_SCHEMA.type().getName())) {
- map.put(camelHeaderKey, singleHeader.value());
- } else if (schema.type().getName().equalsIgnoreCase(Schema.BYTES_SCHEMA.type().getName())) {
- if (Decimal.class.getCanonicalName().equals(schema.name())) {
- map.put(camelHeaderKey, Decimal.toLogical(schema, (byte[])singleHeader.value()));
- } else {
- map.put(camelHeaderKey, (byte[])singleHeader.value());
- }
- } else if (schema.type().getName().equalsIgnoreCase(Schema.FLOAT32_SCHEMA.type().getName())) {
- map.put(camelHeaderKey, (float)singleHeader.value());
- } else if (schema.type().getName().equalsIgnoreCase(Schema.FLOAT64_SCHEMA.type().getName())) {
- map.put(camelHeaderKey, (double)singleHeader.value());
- } else if (schema.type().getName().equalsIgnoreCase(Schema.INT16_SCHEMA.type().getName())) {
- map.put(camelHeaderKey, (short)singleHeader.value());
- } else if (schema.type().getName().equalsIgnoreCase(Schema.INT64_SCHEMA.type().getName())) {
- map.put(camelHeaderKey, (long)singleHeader.value());
- } else if (schema.type().getName().equalsIgnoreCase(Schema.INT8_SCHEMA.type().getName())) {
- map.put(camelHeaderKey, (byte)singleHeader.value());
- } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).type().getName())) {
- map.put(camelHeaderKey, (Map<?, ?>)singleHeader.value());
- } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName())) {
- map.put(camelHeaderKey, (List<?>)singleHeader.value());
- }
- }
+ private static void mapHeader(Header header, String prefix, Map<String, Object> destination) {
+ final String key = StringUtils.removeStart(header.key(), prefix);
+ final Schema schema = header.schema();
- private void addProperty(Exchange exchange, Header singleHeader) {
- String camelPropertyKey = StringUtils.removeStart(singleHeader.key(), PROPERTY_CAMEL_PREFIX);
- Schema schema = singleHeader.schema();
-
- if (schema.type().equals(Timestamp.SCHEMA.type()) && Objects.equals(schema.name(), Timestamp.SCHEMA.name())) {
- exchange.getProperties().put(camelPropertyKey, (Date)singleHeader.value());
- } else if (schema.type().getName().equals(Schema.STRING_SCHEMA.type().getName())) {
- exchange.getProperties().put(camelPropertyKey, (String)singleHeader.value());
- } else if (schema.type().getName().equalsIgnoreCase(Schema.BOOLEAN_SCHEMA.type().getName())) {
- exchange.getProperties().put(camelPropertyKey, (Boolean)singleHeader.value());
- } else if (schema.type().getName().equalsIgnoreCase(Schema.INT32_SCHEMA.type().getName())) {
- exchange.getProperties().put(camelPropertyKey, singleHeader.value());
- } else if (schema.type().getName().equalsIgnoreCase(Schema.BYTES_SCHEMA.type().getName())) {
- exchange.getProperties().put(camelPropertyKey, (byte[])singleHeader.value());
- } else if (schema.type().getName().equalsIgnoreCase(Schema.FLOAT32_SCHEMA.type().getName())) {
- exchange.getProperties().put(camelPropertyKey, (float)singleHeader.value());
- } else if (schema.type().getName().equalsIgnoreCase(Schema.FLOAT64_SCHEMA.type().getName())) {
- exchange.getProperties().put(camelPropertyKey, (double)singleHeader.value());
- } else if (schema.type().getName().equalsIgnoreCase(Schema.INT16_SCHEMA.type().getName())) {
- exchange.getProperties().put(camelPropertyKey, (short)singleHeader.value());
- } else if (schema.type().getName().equalsIgnoreCase(Schema.INT64_SCHEMA.type().getName())) {
- exchange.getProperties().put(camelPropertyKey, (long)singleHeader.value());
- } else if (schema.type().getName().equalsIgnoreCase(Schema.INT8_SCHEMA.type().getName())) {
- exchange.getProperties().put(camelPropertyKey, (byte)singleHeader.value());
- } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).type().getName())) {
- exchange.getProperties().put(camelPropertyKey, (Map<?, ?>)singleHeader.value());
- } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName())) {
- exchange.getProperties().put(camelPropertyKey, (List<?>)singleHeader.value());
+ if (schema.type().equals(Schema.BYTES_SCHEMA.type()) && Objects.equals(schema.name(), Decimal.LOGICAL_NAME)) {
+ destination.put(key, Decimal.toLogical(schema, (byte[]) header.value()));
+ } else {
+ destination.put(key, header.value());
}
}