You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2022/10/06 16:41:43 UTC

[camel-kamelets] branch deduplicate created (now 5ee126df)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch deduplicate
in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git


      at 5ee126df Adding a bean to deduplicate headers

This branch includes the following new commits:

     new 5ee126df Adding a bean to deduplicate headers

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel-kamelets] 01/01: Adding a bean to deduplicate headers

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch deduplicate
in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git

commit 5ee126dfd8604041d494d5cba2882981a4f6a400
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Oct 6 18:41:04 2022 +0200

    Adding a bean to deduplicate headers
---
 kamelets/kafka-not-secured-sink.kamelet.yaml       |  15 +++
 .../utils/headers/DeDuplicateNamingHeaders.java    | 108 +++++++++++++++++++++
 ...eadersTest.java => DeDuplicateHeadersTest.java} |  55 +++++++----
 ...eHeadersTest.java => DuplicateHeadersTest.java} |   5 +-
 .../kamelets/kafka-not-secured-sink.kamelet.yaml   |  15 +++
 5 files changed, 180 insertions(+), 18 deletions(-)

diff --git a/kamelets/kafka-not-secured-sink.kamelet.yaml b/kamelets/kafka-not-secured-sink.kamelet.yaml
index e9b97ad3..05a20deb 100644
--- a/kamelets/kafka-not-secured-sink.kamelet.yaml
+++ b/kamelets/kafka-not-secured-sink.kamelet.yaml
@@ -55,11 +55,26 @@ spec:
   dependencies:
     - "camel:core"
     - "camel:kafka"
+    - "github:apache.camel-kamelets:camel-kamelets-utils:main-SNAPSHOT"
     - "camel:kamelet"
   template:
+    beans:
+      - name: deDuplicateHeaders
+        type: "#class:org.apache.camel.kamelets.utils.headers.DeDuplicateNamingHeaders"
+        property:
+          - key: prefix
+            value: 'kafka.'
+          - key: renamingPrefix
+            value: 'kafka.'
+          - key: mode
+            value: 'filtering'
+          - key: selectedHeaders
+            value: 'kafka.key,kafka.topic,kafka.override_topic'
     from:
       uri: "kamelet:source"
       steps:
+      - process:
+          ref: "{{deDuplicateHeaders}}"
       - choice:
           when:
           - simple: "${header[key]}"
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/headers/DeDuplicateNamingHeaders.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/headers/DeDuplicateNamingHeaders.java
new file mode 100644
index 00000000..b1614554
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/headers/DeDuplicateNamingHeaders.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kamelets.utils.headers;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.Processor;
+
+public class DeDuplicateNamingHeaders implements Processor {
+
+	String prefix;
+	String renamingPrefix;
+	String selectedHeaders;
+	String mode = "all";
+
+	/**
+	 * Default constructor.
+	 */
+	public DeDuplicateNamingHeaders() {
+	}
+
+	/**
+	 * Constructor using fields.
+	 * 
+	 * @param prefix         a prefix to find all the headers to rename.
+	 * @param renamingPrefix the renaming prefix to use on all the matching headers
+	 */
+	public DeDuplicateNamingHeaders(String prefix, String renamingPrefix, String selectedHeaders, String mode) {
+		this.prefix = prefix;
+		this.renamingPrefix = renamingPrefix;
+		this.selectedHeaders = selectedHeaders;
+		this.mode = mode;
+	}
+
+	public void process(Exchange ex) throws InvalidPayloadException {
+		Map<String, Object> originalHeaders = ex.getMessage().getHeaders();
+		Map<String, Object> newHeaders = new HashMap<>();
+	    Iterator<Map.Entry<String, Object>> iterator = originalHeaders.entrySet().iterator();
+	    while (iterator.hasNext()) {
+	        Map.Entry<String, Object> entry = iterator.next();
+			String key = entry.getKey();
+			Object val = entry.getValue();
+			if (prefix != null && mode.equalsIgnoreCase("all")) {
+				if (key.startsWith(prefix)) {
+					String newKey = key.replaceFirst(prefix, renamingPrefix);
+					String subKey = newKey.substring(renamingPrefix.length());
+					String suffix = subKey.toUpperCase();
+					newHeaders.put(renamingPrefix + suffix, val);
+					iterator.remove();
+				}
+			} else {
+				if (selectedHeaders != null && mode.equalsIgnoreCase("filtering")) {
+					List<String> headerList = Arrays.asList(selectedHeaders.split(","));
+					for (Iterator iteratorHeader = headerList.iterator(); iteratorHeader.hasNext();) {
+						String header = (String) iteratorHeader.next();
+						if (key.equalsIgnoreCase(header)) {
+							String newKey = key.replaceFirst(prefix, renamingPrefix);
+							String subKey = newKey.substring(renamingPrefix.length());
+							String suffix = subKey.toUpperCase();
+							newHeaders.put(renamingPrefix + suffix, val);
+							iterator.remove();
+						} 
+					}
+
+				}
+	       }
+	    }
+		originalHeaders.putAll(newHeaders);
+		ex.getMessage().setHeaders(originalHeaders);
+	}
+
+	public void setPrefix(String prefix) {
+		this.prefix = prefix;
+	}
+
+	public void setRenamingPrefix(String renamingPrefix) {
+		this.renamingPrefix = renamingPrefix;
+	}
+
+	public void setSelectedHeaders(String selectedHeaders) {
+		this.selectedHeaders = selectedHeaders;
+	}
+
+	public void setMode(String mode) {
+		this.mode = mode;
+	}	
+
+}
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/RenameHeadersTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DeDuplicateHeadersTest.java
similarity index 57%
copy from library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/RenameHeadersTest.java
copy to library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DeDuplicateHeadersTest.java
index f2510f34..af6c92f2 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/RenameHeadersTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DeDuplicateHeadersTest.java
@@ -24,13 +24,13 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-class RenameHeadersTest {
+class DeDuplicateHeadersTest {
 
     private DefaultCamelContext camelContext;
 
     private final ObjectMapper mapper = new ObjectMapper();
 
-    private DuplicateNamingHeaders processor;
+    private DeDuplicateNamingHeaders processor;
 
     @BeforeEach
     void setup() {
@@ -41,17 +41,19 @@ class RenameHeadersTest {
     void shouldDuplicateHeaders() throws Exception {
         Exchange exchange = new DefaultExchange(camelContext);
 
-        exchange.getMessage().setHeader("CamelAwsS3Key", "test.txt");
-        exchange.getMessage().setHeader("CamelAwsS3BucketName", "kamelets-demo");
+        exchange.getMessage().setHeader("kafka.topic", "peppe");
+        exchange.getMessage().setHeader("kafka.key", "peppe");
+        exchange.getMessage().setHeader("kafka.override_topic", "peppe");
         exchange.getMessage().setHeader("my-header", "header");
 
-        processor = new DuplicateNamingHeaders();
-        processor.setPrefix("CamelAwsS3");
-        processor.setRenamingPrefix("aws.s3.");
+        processor = new DeDuplicateNamingHeaders();
+        processor.setPrefix("kafka.");
+        processor.setRenamingPrefix("kafka.");
         processor.process(exchange);
 
-        Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("aws.s3.key"));
-        Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("aws.s3.bucket.name"));
+        Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("kafka.KEY"));
+        Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("kafka.TOPIC"));
+        Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("kafka.OVERRIDE_TOPIC"));
         Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("my-header"));
     }
     
@@ -59,20 +61,39 @@ class RenameHeadersTest {
     void shouldDuplicateSelectedHeaders() throws Exception {
         Exchange exchange = new DefaultExchange(camelContext);
 
-        exchange.getMessage().setHeader("CamelAwsS3Key", "test.txt");
-        exchange.getMessage().setHeader("CamelAwsS3BucketName", "kamelets-demo");
+        exchange.getMessage().setHeader("kafka.TOPIC", "peppe");
+        exchange.getMessage().setHeader("kafka.KEY", "peppe");
         exchange.getMessage().setHeader("my-header", "header");
+        exchange.getMessage().setHeader("kafka.override_topic", "peppe");
 
-        processor = new DuplicateNamingHeaders();
-        processor.setPrefix("CamelAwsS3");
-        processor.setRenamingPrefix("aws.s3.");
+        processor = new DeDuplicateNamingHeaders();
+        processor.setPrefix("kafka.");
+        processor.setRenamingPrefix("kafka.");
+        processor.setSelectedHeaders("kafka.TOPIC,kafka.KEY");
         processor.setMode("filtering");
-        processor.setSelectedHeaders("CamelAwsS3Key,CamelAwsS3BucketName");
         processor.process(exchange);
 
-        Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("aws.s3.key"));
-        Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("aws.s3.bucket.name"));
+        Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("kafka.KEY"));
+        Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("kafka.TOPIC"));
+        Assertions.assertFalse(exchange.getMessage().getHeaders().containsKey("kafka.OVERRIDE_TOPIC"));
         Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("my-header"));
     }
+    
+    @Test
+    void shouldDeDuplicateSelectedHeaders() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setHeader("kafka.override_topic", "peppe");
+
+        processor = new DeDuplicateNamingHeaders();
+        processor.setPrefix("kafka.");
+        processor.setRenamingPrefix("kafka.");
+        processor.setSelectedHeaders("kafka.topic,kafka.key");
+        processor.setMode("filtering");
+        processor.process(exchange);
+
+        Assertions.assertFalse(exchange.getMessage().getHeaders().containsKey("kafka.OVERRIDE_TOPIC"));
+
+    }
 
 }
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/RenameHeadersTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DuplicateHeadersTest.java
similarity index 98%
rename from library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/RenameHeadersTest.java
rename to library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DuplicateHeadersTest.java
index f2510f34..fdff9401 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/RenameHeadersTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DuplicateHeadersTest.java
@@ -17,6 +17,9 @@
 package org.apache.camel.kamelets.utils.headers;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.HashMap;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.support.DefaultExchange;
@@ -24,7 +27,7 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-class RenameHeadersTest {
+class DuplicateHeadersTest {
 
     private DefaultCamelContext camelContext;
 
diff --git a/library/camel-kamelets/src/main/resources/kamelets/kafka-not-secured-sink.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/kafka-not-secured-sink.kamelet.yaml
index e9b97ad3..05a20deb 100644
--- a/library/camel-kamelets/src/main/resources/kamelets/kafka-not-secured-sink.kamelet.yaml
+++ b/library/camel-kamelets/src/main/resources/kamelets/kafka-not-secured-sink.kamelet.yaml
@@ -55,11 +55,26 @@ spec:
   dependencies:
     - "camel:core"
     - "camel:kafka"
+    - "github:apache.camel-kamelets:camel-kamelets-utils:main-SNAPSHOT"
     - "camel:kamelet"
   template:
+    beans:
+      - name: deDuplicateHeaders
+        type: "#class:org.apache.camel.kamelets.utils.headers.DeDuplicateNamingHeaders"
+        property:
+          - key: prefix
+            value: 'kafka.'
+          - key: renamingPrefix
+            value: 'kafka.'
+          - key: mode
+            value: 'filtering'
+          - key: selectedHeaders
+            value: 'kafka.key,kafka.topic,kafka.override_topic'
     from:
       uri: "kamelet:source"
       steps:
+      - process:
+          ref: "{{deDuplicateHeaders}}"
       - choice:
           when:
           - simple: "${header[key]}"