You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2021/04/08 13:45:56 UTC

[camel-kafka-connector] branch master updated: FieldsToHeadersTransform: added option to copy whole value/key related to issue #988

This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 43ef44d  FieldsToHeadersTransform: added option to copy whole value/key related to issue #988
43ef44d is described below

commit 43ef44de8130d9878131163c8cee021df4dadddd
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Wed Apr 7 15:39:29 2021 +0200

    FieldsToHeadersTransform: added option to copy whole value/key related to issue #988
---
 .../transforms/FieldsToHeadersTransform.java       | 54 +++++++++++++------
 .../transforms/FieldsToHeadersTransformTest.java   | 62 +++++++++++++++++++++-
 .../ROOT/pages/single-message-transform.adoc       |  8 +--
 .../ROOT/pages/transformers/fieldsToHeaders.adoc   | 29 +++++++++-
 4 files changed, 132 insertions(+), 21 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransform.java b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransform.java
index ed74886..72c8f5f 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransform.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransform.java
@@ -43,30 +43,32 @@ public abstract class FieldsToHeadersTransform<R extends ConnectRecord<R>> imple
     private static final String HEADERS_CONFIG = "headers";
 
     public static final ConfigDef CONFIG_DEF = new ConfigDef()
-            .define(FIELDS_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Fields names to extract and set to headers")
+            .define(FIELDS_CONFIG, ConfigDef.Type.LIST, new ArrayList<>(), ConfigDef.Importance.MEDIUM, "Fields names to extract and set to headers")
             .define(HEADERS_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Headers names to set with extracted fields");
 
-
     private List<String> fields;
 
     private List<String> headers;
 
-
-
     protected abstract Schema operatingSchema(R record);
 
     protected abstract Object operatingValue(R record);
 
-
     @Override
     public R apply(R r) {
         RecordValue value = createRecordValue(r);
         Schema currentSchema;
         Object currentValue;
-        for (int i = 0; i < fields.size(); i++) {
-            currentSchema = value.getFieldSchema(fields.get(i));
-            currentValue = value.getFieldValue(fields.get(i));
-            r.headers().add(headers.get(i), currentValue, currentSchema);
+        if (fields.isEmpty()) {
+            currentSchema = value.getFieldSchema("");
+            currentValue = value.getFieldValue("");
+            r.headers().add(headers.get(0), currentValue, currentSchema);
+        } else {
+            for (int i = 0; i < fields.size(); i++) {
+                currentSchema = value.getFieldSchema(fields.get(i));
+                currentValue = value.getFieldValue(fields.get(i));
+                r.headers().add(headers.get(i), currentValue, currentSchema);
+            }
         }
         return r;
     }
@@ -90,26 +92,30 @@ public abstract class FieldsToHeadersTransform<R extends ConnectRecord<R>> imple
 
     private void validateConfig() {
 
-        boolean validFields  = fields.stream().allMatch(nef -> nef != null && !nef.trim().isEmpty());
+        boolean validFields  = fields.stream().allMatch(nef -> nef != null);
         boolean validHeaders = headers.stream().allMatch(nef -> nef != null && !nef.trim().isEmpty());
 
         if (!(validFields && validHeaders)) {
-            throw new IllegalArgumentException("headers and fields configuration properties cannot be null or contain empty elements.");
+            throw new IllegalArgumentException("fields configuration property cannot be null (can be an empty string if you want the whole value/key), headers configuration property cannot be null or contain empty elements.");
         }
-        if (fields.size() > headers.size()) {
+        if (fields.size() != 0 && fields.size() > headers.size()) {
             String fieldsWithoutCorrespondingHeaders = fields.subList(headers.size(), fields.size()).stream().collect(Collectors.joining(","));
             throw new IllegalArgumentException("There is no corresponding header(s) configured for the following field(s): " + fieldsWithoutCorrespondingHeaders);
         }
-        if (headers.size() > fields.size()) {
+        if (fields.size() != 0 && headers.size() > fields.size()) {
             String headersWithoutCorrespondingFields = headers.subList(fields.size(), headers.size()).stream().collect(Collectors.joining(","));
             LOG.warn("There is no corresponding header(s) for the following field(s): {} ", headersWithoutCorrespondingFields);
         }
-
+        if (fields.size() == 0 && headers.size() > 1) {
+            LOG.warn("Fields are empty and there are more than 1 header it means whole value/key will put in the first header of this list: {} ", headers.stream().collect(Collectors.joining(",")));
+        }
     }
 
-
     private RecordValue createRecordValue(R r) {
         final Schema schema = operatingSchema(r);
+        if (fields.isEmpty()) {
+            return new WholeRecordValue(operatingValue(r), schema);
+        }
         if (schema == null) {
             return new MapRecordValue(requireMapOrNull(operatingValue(r), PURPOSE));
         }
@@ -149,6 +155,24 @@ public abstract class FieldsToHeadersTransform<R extends ConnectRecord<R>> imple
         Schema getFieldSchema(String fieldName);
     }
 
+    public class WholeRecordValue implements RecordValue {
+        private Object value;
+        private Schema schema;
+
+        public WholeRecordValue(Object value, Schema schema) {
+            this.value = value;
+            this.schema = schema;
+        }
+
+        public Object getFieldValue(String fieldName) {
+            return value;
+        }
+
+        public Schema getFieldSchema(String fieldName) {
+            return schema;
+        }
+    }
+
     public class MapRecordValue implements RecordValue {
 
         private Map<String, Object> map;
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransformTest.java
index a954124..1fadf91 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransformTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransformTest.java
@@ -62,6 +62,36 @@ public class FieldsToHeadersTransformTest {
     }
 
     @Test
+    public void testWholeKey() {
+        FieldsToHeadersTransform.Key fieldsToHeadersTransform = new FieldsToHeadersTransform.Key();
+        Map<String, String> conf = new HashMap<>();
+        conf.put("headers", "camel.kafka.KEY");
+        fieldsToHeadersTransform.configure(conf);
+        ConnectRecord transformedCr =  fieldsToHeadersTransform.apply(new SourceRecord(null, null, "testTopic", null, 100, null, null));
+        assertEquals(100, transformedCr.headers().lastWithName("camel.kafka.KEY").value());
+    }
+
+    @Test
+    public void testWholeKeyWithSchema() {
+        FieldsToHeadersTransform.Key fieldsToHeadersTransform = new FieldsToHeadersTransform.Key();
+        Map<String, String> conf = new HashMap<>();
+        conf.put("headers", "camel.kafka.KEY");
+        fieldsToHeadersTransform.configure(conf);
+        ConnectRecord transformedCr =  fieldsToHeadersTransform.apply(new SourceRecord(null, null, "testTopic", INT32_SCHEMA, 100, null, null));
+        assertEquals(100, transformedCr.headers().lastWithName("camel.kafka.KEY").value());
+    }
+
+    @Test
+    public void testWholeKeyMultipleHeaders() {
+        FieldsToHeadersTransform.Key fieldsToHeadersTransform = new FieldsToHeadersTransform.Key();
+        Map<String, String> conf = new HashMap<>();
+        conf.put("headers", "camel.kafka.KEY,should.not.be.set");
+        fieldsToHeadersTransform.configure(conf);
+        ConnectRecord transformedCr =  fieldsToHeadersTransform.apply(new SourceRecord(null, null, "testTopic", null, 100, null, null));
+        assertEquals(100, transformedCr.headers().lastWithName("camel.kafka.KEY").value());
+    }
+
+    @Test
     public void testValueWithSchema() {
         FieldsToHeadersTransform.Value fieldsToHeadersTransform = new FieldsToHeadersTransform.Value();
         ConnectRecord transformedCr = testWithSchema(fieldsToHeadersTransform, (schema, value) -> new SourceRecord(null, null, "testTopic", schema, value));
@@ -112,6 +142,36 @@ public class FieldsToHeadersTransformTest {
     }
 
     @Test
+    public void testWholeValue() {
+        FieldsToHeadersTransform.Value fieldsToHeadersTransform = new FieldsToHeadersTransform.Value();
+        Map<String, String> conf = new HashMap<>();
+        conf.put("headers", "camel.kafka.KEY");
+        fieldsToHeadersTransform.configure(conf);
+        ConnectRecord transformedCr =  fieldsToHeadersTransform.apply(new SourceRecord(null, null, "testTopic", null, null, null, 100));
+        assertEquals(100, transformedCr.headers().lastWithName("camel.kafka.KEY").value());
+    }
+
+    @Test
+    public void testWholeValueMultipleHeaders() {
+        FieldsToHeadersTransform.Value fieldsToHeadersTransform = new FieldsToHeadersTransform.Value();
+        Map<String, String> conf = new HashMap<>();
+        conf.put("headers", "camel.kafka.KEY,should.not.be.set");
+        fieldsToHeadersTransform.configure(conf);
+        ConnectRecord transformedCr =  fieldsToHeadersTransform.apply(new SourceRecord(null, null, "testTopic", null, null, null, 100));
+        assertEquals(100, transformedCr.headers().lastWithName("camel.kafka.KEY").value());
+    }
+
+    @Test
+    public void testWholeValueWithSchema() {
+        FieldsToHeadersTransform.Value fieldsToHeadersTransform = new FieldsToHeadersTransform.Value();
+        Map<String, String> conf = new HashMap<>();
+        conf.put("headers", "camel.kafka.KEY");
+        fieldsToHeadersTransform.configure(conf);
+        ConnectRecord transformedCr =  fieldsToHeadersTransform.apply(new SourceRecord(null, null, "testTopic", null, null, INT32_SCHEMA, 100));
+        assertEquals(100, transformedCr.headers().lastWithName("camel.kafka.KEY").value());
+    }
+
+    @Test
     public void fieldsWithoutCorrespondingHeadersTest() {
         Map<String, String> conf = new HashMap<>();
         conf.put("fields", "FROM,TO,CC,SUBJECT,BODY");
@@ -130,7 +190,7 @@ public class FieldsToHeadersTransformTest {
     }
 
     @Test
-    public void headersWithoutCorrespondingFieldssTest() {
+    public void headersWithoutCorrespondingFieldsTest() {
         Map<String, String> conf = new HashMap<>();
         conf.put("fields", "FROM");
         conf.put("headers", "from,to,cc,subject,body");
diff --git a/docs/modules/ROOT/pages/single-message-transform.adoc b/docs/modules/ROOT/pages/single-message-transform.adoc
index 1b2a0a8..4ed18af 100644
--- a/docs/modules/ROOT/pages/single-message-transform.adoc
+++ b/docs/modules/ROOT/pages/single-message-transform.adoc
@@ -10,14 +10,14 @@ SMTs transform inbound messages after a source connector has produced them,
 but before they are written to Kafka. SMTs transform outbound messages before they
 are sent to a sink connector. The following SMTs are available for use with Kafka Connect.
 
-In addition to the https://docs.confluent.io/current/connect/transforms/index.html?_ga=2.114132679.1300749793.1612453936-340332144.1608111036&_gac=1.228771694.1611682559.CjwKCAiAxp-ABhALEiwAXm6IyeFH3rDEIpDWXGQaoibWgcjXxdz8YLkAbi3n8O-quiuWzdAeNezaqhoCdmgQAvD_BwE[prebuilt transformations]#,#
-Camel Kafka Connector provides additionals SMTs:
+In addition to the https://kafka.apache.org/documentation/#connect_transforms[prebuilt transformations],
+Camel Kafka Connector provides additional SMTs:
 
 [cols="^,^ ", options="header"]
 |===
 |Transform | Description
 
 |xref:transformers/fieldsToHeaders.adoc[FieldsToHeaders]
-|Extract fields from a configured record value (struct, schema and map are supported)
-and copy the value in configured headers
+|Extract fields from a kafka connect record's value or key and copy the value in configured headers (struct + schema, map or copy the whole value/key are the supported options)
+
 |===
diff --git a/docs/modules/ROOT/pages/transformers/fieldsToHeaders.adoc b/docs/modules/ROOT/pages/transformers/fieldsToHeaders.adoc
index 4ebc2d1..2fbd0c2 100644
--- a/docs/modules/ROOT/pages/transformers/fieldsToHeaders.adoc
+++ b/docs/modules/ROOT/pages/transformers/fieldsToHeaders.adoc
@@ -17,6 +17,8 @@ field 'a' goes to header 'X', field 'b' goes to header 'Y' etc...
 
 Any null values are passed through unmodified.
 
+If `transforms.fieldsToHeadersTransform.fields` is empty or omitted the whole value will be copied in the first `transforms.fieldsToHeadersTransform.headers`.
+
 Use the concrete transformation type designed for the record key
 (`org.apache.camel.kafkaconnector.transforms.FieldsToHeadersTransform$Key`) or value
 (`org.apache.camel.kafkaconnector.transforms.FieldsToHeadersTransform$Value`).
@@ -25,6 +27,8 @@ Use the concrete transformation type designed for the record key
 
 The following examples show how to use FieldsToHeaders.
 
+=== Given the following configuration:
+
 [source,java-properties]
 ----
 transforms=FieldsToHeadersTransform
@@ -32,7 +36,7 @@ transforms.fieldsToHeadersTransform.type=org.apache.camel.kafkaconnector.transfo
 transforms.fieldsToHeadersTransform.fields=FROM,TO,CC,SUBJECT
 transforms.fieldsToHeadersTransform.headers=from,to,cc,subject
 ----
-Given the following message:
+and the following message:
 
 [source,json]
 ----
@@ -58,6 +62,29 @@ subject: "Needs Attention!"
    body: "Tere is an issue that needs your attention!"
 ----
 
+=== Given the following configuration:
+
+[source,java-properties]
+----
+transforms=FieldsToHeadersTransform
+transforms.fieldsToHeadersTransform.type=org.apache.camel.kafkaconnector.transforms.FieldsToHeadersTransform$Key
+transforms.fieldsToHeadersTransform.headers=camel.kafka.KEY
+----
+and the following message:
+
+[source]
+----
+kafka record with:
+    value: "my message"
+    key: 100
+----
+the following headers will be added to the Kafka ConnectRecord object:
+
+[source]
+----
+   camel.kafka.KEY: 100
+----
+
 == Properties
 
 [cols="^,^ ", options="header"]