You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2024/02/05 13:20:39 UTC

(camel) branch main updated: CAMEL-20382: camel-kafka - RecordMetadata header naming convention (#13005)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 3bcad339dec CAMEL-20382: camel-kafka - RecordMetadata header naming convention (#13005)
3bcad339dec is described below

commit 3bcad339dec96fc5928afc95751453f746a705cb
Author: Jono Morris <jo...@apache.org>
AuthorDate: Tue Feb 6 02:20:31 2024 +1300

    CAMEL-20382: camel-kafka - RecordMetadata header naming convention (#13005)
    
    * CAMEL-20382: camel-kafka - follow naming convention for RecordMetadata header
    
    * CAMEL-20382: use headername kafka.RECORD_META
    
    * CAMEL-20382: tweek upgrade guide
    
    * CAMEL-20382: mention that the header constant has changed
---
 .../resources/org/apache/camel/catalog/components/kafka.json   |  2 +-
 .../resources/org/apache/camel/component/kafka/kafka.json      |  2 +-
 .../java/org/apache/camel/component/kafka/KafkaConstants.java  |  2 +-
 .../java/org/apache/camel/component/kafka/KafkaProducer.java   |  2 +-
 .../camel/component/kafka/producer/support/ProducerUtil.java   |  4 ++--
 .../org/apache/camel/component/kafka/KafkaProducerTest.java    | 10 +++++-----
 .../camel/component/kafka/integration/KafkaProducerFullIT.java | 10 +++++-----
 .../camel/component/kafka/integration/KafkaTransactionIT.java  |  2 +-
 .../modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc         |  6 ++++++
 .../builder/endpoint/dsl/KafkaEndpointBuilderFactory.java      |  7 +++----
 10 files changed, 26 insertions(+), 21 deletions(-)

diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
index ae4af361362..f1a6c6e0dd2 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
@@ -150,7 +150,7 @@
     "kafka.LAST_POLL_RECORD": { "index": 8, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Indicates the last record within the current poll request (only available if autoCommitEnable endpoint parameter is false or allowManualCommit is true)", "constantName": "org.apache.camel.component.kafka.KafkaConstants#LAST_POLL_R [...]
     "kafka.TIMESTAMP": { "index": 9, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The timestamp of the message", "constantName": "org.apache.camel.component.kafka.KafkaConstants#TIMESTAMP" },
     "kafka.OVERRIDE_TIMESTAMP": { "index": 10, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The ProducerRecord also has an associated timestamp. If the user did provide a timestamp, the producer will stamp the record with the provided timestamp and the header is not preserved.", "constantName": "org.apache.camel.componen [...]
-    "org.apache.kafka.clients.producer.RecordMetadata": { "index": 11, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "List<RecordMetadata>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The metadata (only configured if recordMetadata endpoint parameter is true)", "constantName": "org.apache.camel.component.kafka.KafkaConstants#KAFKA_RECORDMETA" },
+    "kafka.RECORD_META": { "index": 11, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "List<RecordMetadata>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The metadata (only configured if recordMetadata endpoint parameter is true)", "constantName": "org.apache.camel.component.kafka.KafkaConstants#KAFKA_RECORD_META" },
     "CamelKafkaManualCommit": { "index": 12, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "org.apache.camel.component.kafka.consumer.KafkaManualCommit", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Can be used for forcing manual offset commit when using Kafka consumer.", "constantName": "org.apache.camel.component.kafka.KafkaConstants#MANUAL_COMMIT" }
   },
   "properties": {
diff --git a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index ae4af361362..f1a6c6e0dd2 100644
--- a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++ b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -150,7 +150,7 @@
     "kafka.LAST_POLL_RECORD": { "index": 8, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Indicates the last record within the current poll request (only available if autoCommitEnable endpoint parameter is false or allowManualCommit is true)", "constantName": "org.apache.camel.component.kafka.KafkaConstants#LAST_POLL_R [...]
     "kafka.TIMESTAMP": { "index": 9, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The timestamp of the message", "constantName": "org.apache.camel.component.kafka.KafkaConstants#TIMESTAMP" },
     "kafka.OVERRIDE_TIMESTAMP": { "index": 10, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The ProducerRecord also has an associated timestamp. If the user did provide a timestamp, the producer will stamp the record with the provided timestamp and the header is not preserved.", "constantName": "org.apache.camel.componen [...]
-    "org.apache.kafka.clients.producer.RecordMetadata": { "index": 11, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "List<RecordMetadata>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The metadata (only configured if recordMetadata endpoint parameter is true)", "constantName": "org.apache.camel.component.kafka.KafkaConstants#KAFKA_RECORDMETA" },
+    "kafka.RECORD_META": { "index": 11, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "List<RecordMetadata>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The metadata (only configured if recordMetadata endpoint parameter is true)", "constantName": "org.apache.camel.component.kafka.KafkaConstants#KAFKA_RECORD_META" },
     "CamelKafkaManualCommit": { "index": 12, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "org.apache.camel.component.kafka.consumer.KafkaManualCommit", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Can be used for forcing manual offset commit when using Kafka consumer.", "constantName": "org.apache.camel.component.kafka.KafkaConstants#MANUAL_COMMIT" }
   },
   "properties": {
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
index 53d9afe57c0..150614ac7d9 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
@@ -65,7 +65,7 @@ public final class KafkaConstants {
     @Metadata(label = "producer",
               description = "The metadata (only configured if `recordMetadata` endpoint parameter is `true`)",
               javaType = "List<RecordMetadata>")
-    public static final String KAFKA_RECORDMETA = "org.apache.kafka.clients.producer.RecordMetadata";
+    public static final String KAFKA_RECORD_META = "kafka.RECORD_META";
     @Metadata(label = "consumer", description = "Can be used for forcing manual offset commit when using Kafka consumer.",
               javaType = "org.apache.camel.component.kafka.consumer.KafkaManualCommit")
     public static final String MANUAL_COMMIT = "CamelKafkaManualCommit";
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 77d3c4450c2..76d68286e1d 100755
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -416,7 +416,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
         // This sets an empty metadata for the very first message on the batch
         List<RecordMetadata> recordMetadata = new ArrayList<>();
         if (configuration.isRecordMetadata()) {
-            exchange.getMessage().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadata);
+            exchange.getMessage().setHeader(KafkaConstants.KAFKA_RECORD_META, recordMetadata);
         }
 
         while (recordIterable.hasNext()) {
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/ProducerUtil.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/ProducerUtil.java
index ba15dc32b77..992726eec3a 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/ProducerUtil.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/ProducerUtil.java
@@ -75,11 +75,11 @@ public final class ProducerUtil {
     public static void setRecordMetadata(Object body, List<RecordMetadata> recordMetadataList) {
         if (body instanceof Exchange) {
             Exchange ex = (Exchange) body;
-            ex.getMessage().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadataList);
+            ex.getMessage().setHeader(KafkaConstants.KAFKA_RECORD_META, recordMetadataList);
         }
         if (body instanceof Message) {
             Message msg = (Message) body;
-            msg.setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadataList);
+            msg.setHeader(KafkaConstants.KAFKA_RECORD_META, recordMetadataList);
         }
     }
 }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 7966e8108cd..5d6dafe3ca9 100755
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -534,21 +534,21 @@ public class KafkaProducerTest {
     }
 
     private void assertRecordMetadataTimestampExists() {
-        List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) in.getHeader(KafkaConstants.KAFKA_RECORDMETA);
+        List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) in.getHeader(KafkaConstants.KAFKA_RECORD_META);
         assertNotNull(recordMetaData1);
         assertEquals(1, recordMetaData1.size(), "Expected one recordMetaData");
         assertNotNull(recordMetaData1.get(0));
     }
 
     private void assertRecordMetadataExists() {
-        List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) in.getHeader(KafkaConstants.KAFKA_RECORDMETA);
+        List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) in.getHeader(KafkaConstants.KAFKA_RECORD_META);
         assertNotNull(recordMetaData1);
         assertEquals(1, recordMetaData1.size(), "Expected one recordMetaData");
         assertNotNull(recordMetaData1.get(0));
     }
 
     private void assertRecordMetadataExists(final int numMetadata) {
-        List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) in.getHeader(KafkaConstants.KAFKA_RECORDMETA);
+        List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) in.getHeader(KafkaConstants.KAFKA_RECORD_META);
         assertNotNull(recordMetaData1);
         assertEquals(recordMetaData1.size(), numMetadata, "Expected one recordMetaData");
         assertNotNull(recordMetaData1.get(0));
@@ -558,7 +558,7 @@ public class KafkaProducerTest {
         List<Exchange> exchanges = (List<Exchange>) in.getBody();
         for (Exchange ex : exchanges) {
             List<RecordMetadata> recordMetaData
-                    = (List<RecordMetadata>) ex.getMessage().getHeader(KafkaConstants.KAFKA_RECORDMETA);
+                    = (List<RecordMetadata>) ex.getMessage().getHeader(KafkaConstants.KAFKA_RECORD_META);
             assertNotNull(recordMetaData);
             assertEquals(1, recordMetaData.size(), "Expected one recordMetaData");
             assertNotNull(recordMetaData.get(0));
@@ -568,7 +568,7 @@ public class KafkaProducerTest {
     private void assertRecordMetadataExistsForEachAggregatedMessage() {
         List<Message> messages = (List<Message>) in.getBody();
         for (Message msg : messages) {
-            List<RecordMetadata> recordMetaData = (List<RecordMetadata>) msg.getHeader(KafkaConstants.KAFKA_RECORDMETA);
+            List<RecordMetadata> recordMetaData = (List<RecordMetadata>) msg.getHeader(KafkaConstants.KAFKA_RECORD_META);
             assertNotNull(recordMetaData);
             assertEquals(1, recordMetaData.size(), "Expected one recordMetaData");
             assertNotNull(recordMetaData.get(0));
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerFullIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerFullIT.java
index 998b8f355b2..07e8bcd95ef 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerFullIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerFullIT.java
@@ -206,7 +206,7 @@ public class KafkaProducerFullIT extends BaseEmbeddedKafkaTestSupport {
         for (Exchange exchange : exchangeList) {
             @SuppressWarnings("unchecked")
             List<RecordMetadata> recordMetaData1
-                    = (List<RecordMetadata>) (exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+                    = (List<RecordMetadata>) (exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORD_META));
             assertEquals(1, recordMetaData1.size(), "One RecordMetadata is expected.");
             assertTrue(recordMetaData1.get(0).offset() >= 0, "Offset is positive");
             assertTrue(recordMetaData1.get(0).topic().startsWith("test"), "Topic Name start with 'test'");
@@ -238,7 +238,7 @@ public class KafkaProducerFullIT extends BaseEmbeddedKafkaTestSupport {
         for (Exchange exchange : exchangeList) {
             @SuppressWarnings("unchecked")
             List<RecordMetadata> recordMetaData1
-                    = (List<RecordMetadata>) (exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+                    = (List<RecordMetadata>) (exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORD_META));
             assertEquals(1, recordMetaData1.size(), "One RecordMetadata is expected.");
             assertTrue(recordMetaData1.get(0).offset() >= 0, "Offset is positive");
             assertTrue(recordMetaData1.get(0).topic().startsWith("test"), "Topic Name start with 'test'");
@@ -299,7 +299,7 @@ public class KafkaProducerFullIT extends BaseEmbeddedKafkaTestSupport {
         assertEquals(2, exchangeList.size(), "Two Exchanges are expected");
         Exchange e1 = exchangeList.get(0);
         @SuppressWarnings("unchecked")
-        List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) (e1.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+        List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) (e1.getIn().getHeader(KafkaConstants.KAFKA_RECORD_META));
         assertEquals(10, recordMetaData1.size(), "Ten RecordMetadata is expected.");
         for (RecordMetadata recordMeta : recordMetaData1) {
             assertTrue(recordMeta.offset() >= 0, "Offset is positive");
@@ -307,7 +307,7 @@ public class KafkaProducerFullIT extends BaseEmbeddedKafkaTestSupport {
         }
         Exchange e2 = exchangeList.get(1);
         @SuppressWarnings("unchecked")
-        List<RecordMetadata> recordMetaData2 = (List<RecordMetadata>) (e2.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+        List<RecordMetadata> recordMetaData2 = (List<RecordMetadata>) (e2.getIn().getHeader(KafkaConstants.KAFKA_RECORD_META));
         assertEquals(5, recordMetaData2.size(), "Five RecordMetadata is expected.");
         for (RecordMetadata recordMeta : recordMetaData2) {
             assertTrue(recordMeta.offset() >= 0, "Offset is positive");
@@ -346,7 +346,7 @@ public class KafkaProducerFullIT extends BaseEmbeddedKafkaTestSupport {
         for (Exchange exchange : exchangeList) {
             @SuppressWarnings("unchecked")
             List<RecordMetadata> recordMetaData1
-                    = (List<RecordMetadata>) (exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+                    = (List<RecordMetadata>) (exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORD_META));
             assertEquals(1, recordMetaData1.size(), "One RecordMetadata is expected.");
             assertTrue(recordMetaData1.get(0).offset() >= 0, "Offset is positive");
             assertTrue(recordMetaData1.get(0).topic().startsWith("test"), "Topic Name start with 'test'");
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaTransactionIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaTransactionIT.java
index f038dc0af85..51ac5f10cdf 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaTransactionIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaTransactionIT.java
@@ -163,7 +163,7 @@ public class KafkaTransactionIT extends BaseEmbeddedKafkaTestSupport {
         for (Exchange exchange : exchangeList) {
             @SuppressWarnings("unchecked")
             List<RecordMetadata> recordMetaData1
-                    = (List<RecordMetadata>) (exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+                    = (List<RecordMetadata>) (exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORD_META));
             assertEquals(1, recordMetaData1.size(), "One RecordMetadata is expected.");
             assertTrue(recordMetaData1.get(0).offset() >= 0, "Offset is positive");
             assertTrue(recordMetaData1.get(0).topic().startsWith("transaction"), "Topic Name start with 'transaction'");
diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc
index 63d3b3ad05a..c14c7e5f5bc 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc
@@ -162,3 +162,9 @@ This option is only intended for the Load Balancer EIP. This makes the YAML sche
 === camel-hdfs
 
 The HDFS component has been deprecated, and planned to be removed in 4.4 (see CAMEL-20196).
+
+=== camel-kafka
+
+The header name for the `List<RecordMetadata>` metadata has changed from
+`org.apache.kafka.clients.producer.RecordMetadata` to `kafka.RECORD_META`,
+and the header constant from `KAFKA_RECORDMETA` to `KAFKA_RECORD_META`.
\ No newline at end of file
diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
index cfdcf26558b..20c82a9b412 100644
--- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
+++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
@@ -5338,11 +5338,10 @@ public interface KafkaEndpointBuilderFactory {
          * 
          * Group: producer
          * 
-         * @return the name of the header {@code
-         * org.apache.kafka.clients.producer.RecordMetadata}.
+         * @return the name of the header {@code kafka.RECORD_META}.
          */
-        public String orgApacheKafkaClientsProducerRecordmetadata() {
-            return "org.apache.kafka.clients.producer.RecordMetadata";
+        public String kafkaRecordMeta() {
+            return "kafka.RECORD_META";
         }
 
         /**