You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/09/07 15:02:37 UTC

[camel-kafka-connector] branch master updated: adding a key transformer to the AWS2-SQS connector so that the CamelHeader.CamelAwsSqsMessageId header value is set as the key

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new cc301c2  adding a key transformer to the AWS2-SQS connector so that the CamelHeader.CamelAwsSqsMessageId header value is set as the key
     new f536319  Merge pull request #431 from codexetreme/master
cc301c2 is described below

commit cc301c253cfe6a1862c7d4c3bad747ad4cbebbe4
Author: Yashodhan Ghadge <ya...@gmail.com>
AuthorDate: Mon Sep 7 19:23:23 2020 +0530

    adding a key transformer to the AWS2-SQS connector so that the CamelHeader.CamelAwsSqsMessageId header value is set as the key
---
 .../aws2sqs/transforms/SQSKeySetterTransform.java  | 38 ++++++++++++++++++++++
 1 file changed, 38 insertions(+)

diff --git a/connectors/camel-aws2-sqs-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2sqs/transforms/SQSKeySetterTransform.java b/connectors/camel-aws2-sqs-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2sqs/transforms/SQSKeySetterTransform.java
new file mode 100644
index 0000000..6296650
--- /dev/null
+++ b/connectors/camel-aws2-sqs-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2sqs/transforms/SQSKeySetterTransform.java
@@ -0,0 +1,38 @@
+package org.apache.camel.kafkaconnector.aws2sqs.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.Transformation;
+
+import java.util.Map;
+
+public class SQSKeySetterTransform<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define("test", ConfigDef.Type.STRING, "test", ConfigDef.Importance.MEDIUM, "Fetch the Camel.CamelAwsSqsMessageId header and set it as the key for the kafka record");
+
+
+    @Override
+    public R apply(R record) {
+        Headers headers = record.headers();
+        String key = (String) headers.lastWithName("CamelHeader.CamelAwsSqsMessageId").value();
+        return record.newRecord(record.topic(), record.kafkaPartition(), null, key, Schema.STRING_SCHEMA, record.value(), record.timestamp());
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> map) {
+
+    }
+}