You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/09/07 15:02:37 UTC
[camel-kafka-connector] branch master updated: adding a key
transformer to the AWS2-SQS connector so that the
CamelHeader.CamelAwsSqsMessageId header value is set as the key
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new cc301c2 adding a key transformer to the AWS2-SQS connector so that the CamelHeader.CamelAwsSqsMessageId header value is set as the key
new f536319 Merge pull request #431 from codexetreme/master
cc301c2 is described below
commit cc301c253cfe6a1862c7d4c3bad747ad4cbebbe4
Author: Yashodhan Ghadge <ya...@gmail.com>
AuthorDate: Mon Sep 7 19:23:23 2020 +0530
adding a key transformer to the AWS2-SQS connector so that the CamelHeader.CamelAwsSqsMessageId header value is set as the key
---
.../aws2sqs/transforms/SQSKeySetterTransform.java | 38 ++++++++++++++++++++++
1 file changed, 38 insertions(+)
diff --git a/connectors/camel-aws2-sqs-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2sqs/transforms/SQSKeySetterTransform.java b/connectors/camel-aws2-sqs-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2sqs/transforms/SQSKeySetterTransform.java
new file mode 100644
index 0000000..6296650
--- /dev/null
+++ b/connectors/camel-aws2-sqs-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2sqs/transforms/SQSKeySetterTransform.java
@@ -0,0 +1,38 @@
+package org.apache.camel.kafkaconnector.aws2sqs.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.Transformation;
+
+import java.util.Map;
+
+public class SQSKeySetterTransform<R extends ConnectRecord<R>> implements Transformation<R> {
+
+ public static final ConfigDef CONFIG_DEF = new ConfigDef()
+ .define("test", ConfigDef.Type.STRING, "test", ConfigDef.Importance.MEDIUM, "Fetch the Camel.CamelAwsSqsMessageId header and set it as the key for the kafka record");
+
+
+ @Override
+ public R apply(R record) {
+ Headers headers = record.headers();
+ String key = (String) headers.lastWithName("CamelHeader.CamelAwsSqsMessageId").value();
+ return record.newRecord(record.topic(), record.kafkaPartition(), null, key, Schema.STRING_SCHEMA, record.value(), record.timestamp());
+ }
+
+ @Override
+ public ConfigDef config() {
+ return CONFIG_DEF;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void configure(Map<String, ?> map) {
+
+ }
+}