You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2021/04/08 13:45:56 UTC
[camel-kafka-connector] branch master updated:
FieldsToHeadersTransform: added option to copy whole value/key related to
issue #988
This is an automated email from the ASF dual-hosted git repository.
valdar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 43ef44d FieldsToHeadersTransform: added option to copy whole value/key related to issue #988
43ef44d is described below
commit 43ef44de8130d9878131163c8cee021df4dadddd
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Wed Apr 7 15:39:29 2021 +0200
FieldsToHeadersTransform: added option to copy whole value/key related to issue #988
---
.../transforms/FieldsToHeadersTransform.java | 54 +++++++++++++------
.../transforms/FieldsToHeadersTransformTest.java | 62 +++++++++++++++++++++-
.../ROOT/pages/single-message-transform.adoc | 8 +--
.../ROOT/pages/transformers/fieldsToHeaders.adoc | 29 +++++++++-
4 files changed, 132 insertions(+), 21 deletions(-)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransform.java b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransform.java
index ed74886..72c8f5f 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransform.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransform.java
@@ -43,30 +43,32 @@ public abstract class FieldsToHeadersTransform<R extends ConnectRecord<R>> imple
private static final String HEADERS_CONFIG = "headers";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
- .define(FIELDS_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Fields names to extract and set to headers")
+ .define(FIELDS_CONFIG, ConfigDef.Type.LIST, new ArrayList<>(), ConfigDef.Importance.MEDIUM, "Fields names to extract and set to headers")
.define(HEADERS_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Headers names to set with extracted fields");
-
private List<String> fields;
private List<String> headers;
-
-
protected abstract Schema operatingSchema(R record);
protected abstract Object operatingValue(R record);
-
@Override
public R apply(R r) {
RecordValue value = createRecordValue(r);
Schema currentSchema;
Object currentValue;
- for (int i = 0; i < fields.size(); i++) {
- currentSchema = value.getFieldSchema(fields.get(i));
- currentValue = value.getFieldValue(fields.get(i));
- r.headers().add(headers.get(i), currentValue, currentSchema);
+ if (fields.isEmpty()) {
+ currentSchema = value.getFieldSchema("");
+ currentValue = value.getFieldValue("");
+ r.headers().add(headers.get(0), currentValue, currentSchema);
+ } else {
+ for (int i = 0; i < fields.size(); i++) {
+ currentSchema = value.getFieldSchema(fields.get(i));
+ currentValue = value.getFieldValue(fields.get(i));
+ r.headers().add(headers.get(i), currentValue, currentSchema);
+ }
}
return r;
}
@@ -90,26 +92,30 @@ public abstract class FieldsToHeadersTransform<R extends ConnectRecord<R>> imple
private void validateConfig() {
- boolean validFields = fields.stream().allMatch(nef -> nef != null && !nef.trim().isEmpty());
+ boolean validFields = fields.stream().allMatch(nef -> nef != null);
boolean validHeaders = headers.stream().allMatch(nef -> nef != null && !nef.trim().isEmpty());
if (!(validFields && validHeaders)) {
- throw new IllegalArgumentException("headers and fields configuration properties cannot be null or contain empty elements.");
+ throw new IllegalArgumentException("fields configuration property cannot be null (can be an empty string if you want the whole value/key), headers configuration property cannot be null or contain empty elements.");
}
- if (fields.size() > headers.size()) {
+ if (fields.size() != 0 && fields.size() > headers.size()) {
String fieldsWithoutCorrespondingHeaders = fields.subList(headers.size(), fields.size()).stream().collect(Collectors.joining(","));
throw new IllegalArgumentException("There is no corresponding header(s) configured for the following field(s): " + fieldsWithoutCorrespondingHeaders);
}
- if (headers.size() > fields.size()) {
+ if (fields.size() != 0 && headers.size() > fields.size()) {
String headersWithoutCorrespondingFields = headers.subList(fields.size(), headers.size()).stream().collect(Collectors.joining(","));
LOG.warn("There is no corresponding header(s) for the following field(s): {} ", headersWithoutCorrespondingFields);
}
-
+ if (fields.size() == 0 && headers.size() > 1) {
+ LOG.warn("Fields are empty and there are more than 1 header it means whole value/key will put in the first header of this list: {} ", headers.stream().collect(Collectors.joining(",")));
+ }
}
-
private RecordValue createRecordValue(R r) {
final Schema schema = operatingSchema(r);
+ if (fields.isEmpty()) {
+ return new WholeRecordValue(operatingValue(r), schema);
+ }
if (schema == null) {
return new MapRecordValue(requireMapOrNull(operatingValue(r), PURPOSE));
}
@@ -149,6 +155,24 @@ public abstract class FieldsToHeadersTransform<R extends ConnectRecord<R>> imple
Schema getFieldSchema(String fieldName);
}
+ public class WholeRecordValue implements RecordValue {
+ private Object value;
+ private Schema schema;
+
+ public WholeRecordValue(Object value, Schema schema) {
+ this.value = value;
+ this.schema = schema;
+ }
+
+ public Object getFieldValue(String fieldName) {
+ return value;
+ }
+
+ public Schema getFieldSchema(String fieldName) {
+ return schema;
+ }
+ }
+
public class MapRecordValue implements RecordValue {
private Map<String, Object> map;
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransformTest.java
index a954124..1fadf91 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransformTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransformTest.java
@@ -62,6 +62,36 @@ public class FieldsToHeadersTransformTest {
}
@Test
+ public void testWholeKey() {
+ FieldsToHeadersTransform.Key fieldsToHeadersTransform = new FieldsToHeadersTransform.Key();
+ Map<String, String> conf = new HashMap<>();
+ conf.put("headers", "camel.kafka.KEY");
+ fieldsToHeadersTransform.configure(conf);
+ ConnectRecord transformedCr = fieldsToHeadersTransform.apply(new SourceRecord(null, null, "testTopic", null, 100, null, null));
+ assertEquals(100, transformedCr.headers().lastWithName("camel.kafka.KEY").value());
+ }
+
+ @Test
+ public void testWholeKeyWithSchema() {
+ FieldsToHeadersTransform.Key fieldsToHeadersTransform = new FieldsToHeadersTransform.Key();
+ Map<String, String> conf = new HashMap<>();
+ conf.put("headers", "camel.kafka.KEY");
+ fieldsToHeadersTransform.configure(conf);
+ ConnectRecord transformedCr = fieldsToHeadersTransform.apply(new SourceRecord(null, null, "testTopic", INT32_SCHEMA, 100, null, null));
+ assertEquals(100, transformedCr.headers().lastWithName("camel.kafka.KEY").value());
+ }
+
+ @Test
+ public void testWholeKeyMultipleHeaders() {
+ FieldsToHeadersTransform.Key fieldsToHeadersTransform = new FieldsToHeadersTransform.Key();
+ Map<String, String> conf = new HashMap<>();
+ conf.put("headers", "camel.kafka.KEY,should.not.be.set");
+ fieldsToHeadersTransform.configure(conf);
+ ConnectRecord transformedCr = fieldsToHeadersTransform.apply(new SourceRecord(null, null, "testTopic", null, 100, null, null));
+ assertEquals(100, transformedCr.headers().lastWithName("camel.kafka.KEY").value());
+ }
+
+ @Test
public void testValueWithSchema() {
FieldsToHeadersTransform.Value fieldsToHeadersTransform = new FieldsToHeadersTransform.Value();
ConnectRecord transformedCr = testWithSchema(fieldsToHeadersTransform, (schema, value) -> new SourceRecord(null, null, "testTopic", schema, value));
@@ -112,6 +142,36 @@ public class FieldsToHeadersTransformTest {
}
@Test
+ public void testWholeValue() {
+ FieldsToHeadersTransform.Value fieldsToHeadersTransform = new FieldsToHeadersTransform.Value();
+ Map<String, String> conf = new HashMap<>();
+ conf.put("headers", "camel.kafka.KEY");
+ fieldsToHeadersTransform.configure(conf);
+ ConnectRecord transformedCr = fieldsToHeadersTransform.apply(new SourceRecord(null, null, "testTopic", null, null, null, 100));
+ assertEquals(100, transformedCr.headers().lastWithName("camel.kafka.KEY").value());
+ }
+
+ @Test
+ public void testWholeValueMultipleHeaders() {
+ FieldsToHeadersTransform.Value fieldsToHeadersTransform = new FieldsToHeadersTransform.Value();
+ Map<String, String> conf = new HashMap<>();
+ conf.put("headers", "camel.kafka.KEY,should.not.be.set");
+ fieldsToHeadersTransform.configure(conf);
+ ConnectRecord transformedCr = fieldsToHeadersTransform.apply(new SourceRecord(null, null, "testTopic", null, null, null, 100));
+ assertEquals(100, transformedCr.headers().lastWithName("camel.kafka.KEY").value());
+ }
+
+ @Test
+ public void testWholeValueWithSchema() {
+ FieldsToHeadersTransform.Value fieldsToHeadersTransform = new FieldsToHeadersTransform.Value();
+ Map<String, String> conf = new HashMap<>();
+ conf.put("headers", "camel.kafka.KEY");
+ fieldsToHeadersTransform.configure(conf);
+ ConnectRecord transformedCr = fieldsToHeadersTransform.apply(new SourceRecord(null, null, "testTopic", null, null, INT32_SCHEMA, 100));
+ assertEquals(100, transformedCr.headers().lastWithName("camel.kafka.KEY").value());
+ }
+
+ @Test
public void fieldsWithoutCorrespondingHeadersTest() {
Map<String, String> conf = new HashMap<>();
conf.put("fields", "FROM,TO,CC,SUBJECT,BODY");
@@ -130,7 +190,7 @@ public class FieldsToHeadersTransformTest {
}
@Test
- public void headersWithoutCorrespondingFieldssTest() {
+ public void headersWithoutCorrespondingFieldsTest() {
Map<String, String> conf = new HashMap<>();
conf.put("fields", "FROM");
conf.put("headers", "from,to,cc,subject,body");
diff --git a/docs/modules/ROOT/pages/single-message-transform.adoc b/docs/modules/ROOT/pages/single-message-transform.adoc
index 1b2a0a8..4ed18af 100644
--- a/docs/modules/ROOT/pages/single-message-transform.adoc
+++ b/docs/modules/ROOT/pages/single-message-transform.adoc
@@ -10,14 +10,14 @@ SMTs transform inbound messages after a source connector has produced them,
but before they are written to Kafka. SMTs transform outbound messages before they
are sent to a sink connector. The following SMTs are available for use with Kafka Connect.
-In addition to the https://docs.confluent.io/current/connect/transforms/index.html?_ga=2.114132679.1300749793.1612453936-340332144.1608111036&_gac=1.228771694.1611682559.CjwKCAiAxp-ABhALEiwAXm6IyeFH3rDEIpDWXGQaoibWgcjXxdz8YLkAbi3n8O-quiuWzdAeNezaqhoCdmgQAvD_BwE[prebuilt transformations]#,#
-Camel Kafka Connector provides additionals SMTs:
+In addition to the https://kafka.apache.org/documentation/#connect_transforms[prebuilt transformations],
+Camel Kafka Connector provides additional SMTs:
[cols="^,^ ", options="header"]
|===
|Transform | Description
|xref:transformers/fieldsToHeaders.adoc[FieldsToHeaders]
-|Extract fields from a configured record value (struct, schema and map are supported)
-and copy the value in configured headers
+|Extract fields from a kafka connect record's value or key and copy the value in configured headers (struct + schema, map or copy the whole value/key are the supported options)
+
|===
diff --git a/docs/modules/ROOT/pages/transformers/fieldsToHeaders.adoc b/docs/modules/ROOT/pages/transformers/fieldsToHeaders.adoc
index 4ebc2d1..2fbd0c2 100644
--- a/docs/modules/ROOT/pages/transformers/fieldsToHeaders.adoc
+++ b/docs/modules/ROOT/pages/transformers/fieldsToHeaders.adoc
@@ -17,6 +17,8 @@ field 'a' goes to header 'X', field 'b' goes to header 'Y' etc...
Any null values are passed through unmodified.
+If `transforms.fieldsToHeadersTransform.fields` is empty or omitted the whole value will be copied in the first `transforms.fieldsToHeadersTransform.headers`.
+
Use the concrete transformation type designed for the record key
(`org.apache.camel.kafkaconnector.transforms.FieldsToHeadersTransform$Key`) or value
(`org.apache.camel.kafkaconnector.transforms.FieldsToHeadersTransform$Value`).
@@ -25,6 +27,8 @@ Use the concrete transformation type designed for the record key
The following examples show how to use FieldsToHeaders.
+=== Given the following configuration:
+
[source,java-properties]
----
transforms=FieldsToHeadersTransform
@@ -32,7 +36,7 @@ transforms.fieldsToHeadersTransform.type=org.apache.camel.kafkaconnector.transfo
transforms.fieldsToHeadersTransform.fields=FROM,TO,CC,SUBJECT
transforms.fieldsToHeadersTransform.headers=from,to,cc,subject
----
-Given the following message:
+and the following message:
[source,json]
----
@@ -58,6 +62,29 @@ subject: "Needs Attention!"
body: "Tere is an issue that needs your attention!"
----
+=== Given the following configuration:
+
+[source,java-properties]
+----
+transforms=FieldsToHeadersTransform
+transforms.fieldsToHeadersTransform.type=org.apache.camel.kafkaconnector.transforms.FieldsToHeadersTransform$Key
+transforms.fieldsToHeadersTransform.headers=camel.kafka.KEY
+----
+and the following message:
+
+[source]
+----
+kafka record with:
+ value: "my message"
+ key: 100
+----
+the following headers will be added to the Kafka ConnectRecord object:
+
+[source]
+----
+ camel.kafka.KEY: 100
+----
+
== Properties
[cols="^,^ ", options="header"]