You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2024/02/05 13:20:39 UTC
(camel) branch main updated: CAMEL-20382: camel-kafka - RecordMetadata header naming convention (#13005)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 3bcad339dec CAMEL-20382: camel-kafka - RecordMetadata header naming convention (#13005)
3bcad339dec is described below
commit 3bcad339dec96fc5928afc95751453f746a705cb
Author: Jono Morris <jo...@apache.org>
AuthorDate: Tue Feb 6 02:20:31 2024 +1300
CAMEL-20382: camel-kafka - RecordMetadata header naming convention (#13005)
* CAMEL-20382: camel-kafka - follow naming convention for RecordMetadata header
* CAMEL-20382: use headername kafka.RECORD_META
* CAMEL-20382: tweek upgrade guide
* CAMEL-20382: mention that the header constant has changed
---
.../resources/org/apache/camel/catalog/components/kafka.json | 2 +-
.../resources/org/apache/camel/component/kafka/kafka.json | 2 +-
.../java/org/apache/camel/component/kafka/KafkaConstants.java | 2 +-
.../java/org/apache/camel/component/kafka/KafkaProducer.java | 2 +-
.../camel/component/kafka/producer/support/ProducerUtil.java | 4 ++--
.../org/apache/camel/component/kafka/KafkaProducerTest.java | 10 +++++-----
.../camel/component/kafka/integration/KafkaProducerFullIT.java | 10 +++++-----
.../camel/component/kafka/integration/KafkaTransactionIT.java | 2 +-
.../modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc | 6 ++++++
.../builder/endpoint/dsl/KafkaEndpointBuilderFactory.java | 7 +++----
10 files changed, 26 insertions(+), 21 deletions(-)
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
index ae4af361362..f1a6c6e0dd2 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
@@ -150,7 +150,7 @@
"kafka.LAST_POLL_RECORD": { "index": 8, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Indicates the last record within the current poll request (only available if autoCommitEnable endpoint parameter is false or allowManualCommit is true)", "constantName": "org.apache.camel.component.kafka.KafkaConstants#LAST_POLL_R [...]
"kafka.TIMESTAMP": { "index": 9, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The timestamp of the message", "constantName": "org.apache.camel.component.kafka.KafkaConstants#TIMESTAMP" },
"kafka.OVERRIDE_TIMESTAMP": { "index": 10, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The ProducerRecord also has an associated timestamp. If the user did provide a timestamp, the producer will stamp the record with the provided timestamp and the header is not preserved.", "constantName": "org.apache.camel.componen [...]
- "org.apache.kafka.clients.producer.RecordMetadata": { "index": 11, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "List<RecordMetadata>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The metadata (only configured if recordMetadata endpoint parameter is true)", "constantName": "org.apache.camel.component.kafka.KafkaConstants#KAFKA_RECORDMETA" },
+ "kafka.RECORD_META": { "index": 11, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "List<RecordMetadata>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The metadata (only configured if recordMetadata endpoint parameter is true)", "constantName": "org.apache.camel.component.kafka.KafkaConstants#KAFKA_RECORD_META" },
"CamelKafkaManualCommit": { "index": 12, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "org.apache.camel.component.kafka.consumer.KafkaManualCommit", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Can be used for forcing manual offset commit when using Kafka consumer.", "constantName": "org.apache.camel.component.kafka.KafkaConstants#MANUAL_COMMIT" }
},
"properties": {
diff --git a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index ae4af361362..f1a6c6e0dd2 100644
--- a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++ b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -150,7 +150,7 @@
"kafka.LAST_POLL_RECORD": { "index": 8, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Indicates the last record within the current poll request (only available if autoCommitEnable endpoint parameter is false or allowManualCommit is true)", "constantName": "org.apache.camel.component.kafka.KafkaConstants#LAST_POLL_R [...]
"kafka.TIMESTAMP": { "index": 9, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The timestamp of the message", "constantName": "org.apache.camel.component.kafka.KafkaConstants#TIMESTAMP" },
"kafka.OVERRIDE_TIMESTAMP": { "index": 10, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The ProducerRecord also has an associated timestamp. If the user did provide a timestamp, the producer will stamp the record with the provided timestamp and the header is not preserved.", "constantName": "org.apache.camel.componen [...]
- "org.apache.kafka.clients.producer.RecordMetadata": { "index": 11, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "List<RecordMetadata>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The metadata (only configured if recordMetadata endpoint parameter is true)", "constantName": "org.apache.camel.component.kafka.KafkaConstants#KAFKA_RECORDMETA" },
+ "kafka.RECORD_META": { "index": 11, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "List<RecordMetadata>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The metadata (only configured if recordMetadata endpoint parameter is true)", "constantName": "org.apache.camel.component.kafka.KafkaConstants#KAFKA_RECORD_META" },
"CamelKafkaManualCommit": { "index": 12, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "org.apache.camel.component.kafka.consumer.KafkaManualCommit", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Can be used for forcing manual offset commit when using Kafka consumer.", "constantName": "org.apache.camel.component.kafka.KafkaConstants#MANUAL_COMMIT" }
},
"properties": {
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
index 53d9afe57c0..150614ac7d9 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
@@ -65,7 +65,7 @@ public final class KafkaConstants {
@Metadata(label = "producer",
description = "The metadata (only configured if `recordMetadata` endpoint parameter is `true`)",
javaType = "List<RecordMetadata>")
- public static final String KAFKA_RECORDMETA = "org.apache.kafka.clients.producer.RecordMetadata";
+ public static final String KAFKA_RECORD_META = "kafka.RECORD_META";
@Metadata(label = "consumer", description = "Can be used for forcing manual offset commit when using Kafka consumer.",
javaType = "org.apache.camel.component.kafka.consumer.KafkaManualCommit")
public static final String MANUAL_COMMIT = "CamelKafkaManualCommit";
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 77d3c4450c2..76d68286e1d 100755
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -416,7 +416,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
// This sets an empty metadata for the very first message on the batch
List<RecordMetadata> recordMetadata = new ArrayList<>();
if (configuration.isRecordMetadata()) {
- exchange.getMessage().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadata);
+ exchange.getMessage().setHeader(KafkaConstants.KAFKA_RECORD_META, recordMetadata);
}
while (recordIterable.hasNext()) {
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/ProducerUtil.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/ProducerUtil.java
index ba15dc32b77..992726eec3a 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/ProducerUtil.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/ProducerUtil.java
@@ -75,11 +75,11 @@ public final class ProducerUtil {
public static void setRecordMetadata(Object body, List<RecordMetadata> recordMetadataList) {
if (body instanceof Exchange) {
Exchange ex = (Exchange) body;
- ex.getMessage().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadataList);
+ ex.getMessage().setHeader(KafkaConstants.KAFKA_RECORD_META, recordMetadataList);
}
if (body instanceof Message) {
Message msg = (Message) body;
- msg.setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadataList);
+ msg.setHeader(KafkaConstants.KAFKA_RECORD_META, recordMetadataList);
}
}
}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 7966e8108cd..5d6dafe3ca9 100755
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -534,21 +534,21 @@ public class KafkaProducerTest {
}
private void assertRecordMetadataTimestampExists() {
- List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) in.getHeader(KafkaConstants.KAFKA_RECORDMETA);
+ List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) in.getHeader(KafkaConstants.KAFKA_RECORD_META);
assertNotNull(recordMetaData1);
assertEquals(1, recordMetaData1.size(), "Expected one recordMetaData");
assertNotNull(recordMetaData1.get(0));
}
private void assertRecordMetadataExists() {
- List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) in.getHeader(KafkaConstants.KAFKA_RECORDMETA);
+ List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) in.getHeader(KafkaConstants.KAFKA_RECORD_META);
assertNotNull(recordMetaData1);
assertEquals(1, recordMetaData1.size(), "Expected one recordMetaData");
assertNotNull(recordMetaData1.get(0));
}
private void assertRecordMetadataExists(final int numMetadata) {
- List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) in.getHeader(KafkaConstants.KAFKA_RECORDMETA);
+ List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) in.getHeader(KafkaConstants.KAFKA_RECORD_META);
assertNotNull(recordMetaData1);
assertEquals(recordMetaData1.size(), numMetadata, "Expected one recordMetaData");
assertNotNull(recordMetaData1.get(0));
@@ -558,7 +558,7 @@ public class KafkaProducerTest {
List<Exchange> exchanges = (List<Exchange>) in.getBody();
for (Exchange ex : exchanges) {
List<RecordMetadata> recordMetaData
- = (List<RecordMetadata>) ex.getMessage().getHeader(KafkaConstants.KAFKA_RECORDMETA);
+ = (List<RecordMetadata>) ex.getMessage().getHeader(KafkaConstants.KAFKA_RECORD_META);
assertNotNull(recordMetaData);
assertEquals(1, recordMetaData.size(), "Expected one recordMetaData");
assertNotNull(recordMetaData.get(0));
@@ -568,7 +568,7 @@ public class KafkaProducerTest {
private void assertRecordMetadataExistsForEachAggregatedMessage() {
List<Message> messages = (List<Message>) in.getBody();
for (Message msg : messages) {
- List<RecordMetadata> recordMetaData = (List<RecordMetadata>) msg.getHeader(KafkaConstants.KAFKA_RECORDMETA);
+ List<RecordMetadata> recordMetaData = (List<RecordMetadata>) msg.getHeader(KafkaConstants.KAFKA_RECORD_META);
assertNotNull(recordMetaData);
assertEquals(1, recordMetaData.size(), "Expected one recordMetaData");
assertNotNull(recordMetaData.get(0));
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerFullIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerFullIT.java
index 998b8f355b2..07e8bcd95ef 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerFullIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerFullIT.java
@@ -206,7 +206,7 @@ public class KafkaProducerFullIT extends BaseEmbeddedKafkaTestSupport {
for (Exchange exchange : exchangeList) {
@SuppressWarnings("unchecked")
List<RecordMetadata> recordMetaData1
- = (List<RecordMetadata>) (exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+ = (List<RecordMetadata>) (exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORD_META));
assertEquals(1, recordMetaData1.size(), "One RecordMetadata is expected.");
assertTrue(recordMetaData1.get(0).offset() >= 0, "Offset is positive");
assertTrue(recordMetaData1.get(0).topic().startsWith("test"), "Topic Name start with 'test'");
@@ -238,7 +238,7 @@ public class KafkaProducerFullIT extends BaseEmbeddedKafkaTestSupport {
for (Exchange exchange : exchangeList) {
@SuppressWarnings("unchecked")
List<RecordMetadata> recordMetaData1
- = (List<RecordMetadata>) (exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+ = (List<RecordMetadata>) (exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORD_META));
assertEquals(1, recordMetaData1.size(), "One RecordMetadata is expected.");
assertTrue(recordMetaData1.get(0).offset() >= 0, "Offset is positive");
assertTrue(recordMetaData1.get(0).topic().startsWith("test"), "Topic Name start with 'test'");
@@ -299,7 +299,7 @@ public class KafkaProducerFullIT extends BaseEmbeddedKafkaTestSupport {
assertEquals(2, exchangeList.size(), "Two Exchanges are expected");
Exchange e1 = exchangeList.get(0);
@SuppressWarnings("unchecked")
- List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) (e1.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+ List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) (e1.getIn().getHeader(KafkaConstants.KAFKA_RECORD_META));
assertEquals(10, recordMetaData1.size(), "Ten RecordMetadata is expected.");
for (RecordMetadata recordMeta : recordMetaData1) {
assertTrue(recordMeta.offset() >= 0, "Offset is positive");
@@ -307,7 +307,7 @@ public class KafkaProducerFullIT extends BaseEmbeddedKafkaTestSupport {
}
Exchange e2 = exchangeList.get(1);
@SuppressWarnings("unchecked")
- List<RecordMetadata> recordMetaData2 = (List<RecordMetadata>) (e2.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+ List<RecordMetadata> recordMetaData2 = (List<RecordMetadata>) (e2.getIn().getHeader(KafkaConstants.KAFKA_RECORD_META));
assertEquals(5, recordMetaData2.size(), "Five RecordMetadata is expected.");
for (RecordMetadata recordMeta : recordMetaData2) {
assertTrue(recordMeta.offset() >= 0, "Offset is positive");
@@ -346,7 +346,7 @@ public class KafkaProducerFullIT extends BaseEmbeddedKafkaTestSupport {
for (Exchange exchange : exchangeList) {
@SuppressWarnings("unchecked")
List<RecordMetadata> recordMetaData1
- = (List<RecordMetadata>) (exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+ = (List<RecordMetadata>) (exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORD_META));
assertEquals(1, recordMetaData1.size(), "One RecordMetadata is expected.");
assertTrue(recordMetaData1.get(0).offset() >= 0, "Offset is positive");
assertTrue(recordMetaData1.get(0).topic().startsWith("test"), "Topic Name start with 'test'");
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaTransactionIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaTransactionIT.java
index f038dc0af85..51ac5f10cdf 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaTransactionIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaTransactionIT.java
@@ -163,7 +163,7 @@ public class KafkaTransactionIT extends BaseEmbeddedKafkaTestSupport {
for (Exchange exchange : exchangeList) {
@SuppressWarnings("unchecked")
List<RecordMetadata> recordMetaData1
- = (List<RecordMetadata>) (exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+ = (List<RecordMetadata>) (exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORD_META));
assertEquals(1, recordMetaData1.size(), "One RecordMetadata is expected.");
assertTrue(recordMetaData1.get(0).offset() >= 0, "Offset is positive");
assertTrue(recordMetaData1.get(0).topic().startsWith("transaction"), "Topic Name start with 'transaction'");
diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc
index 63d3b3ad05a..c14c7e5f5bc 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc
@@ -162,3 +162,9 @@ This option is only intended for the Load Balancer EIP. This makes the YAML sche
=== camel-hdfs
The HDFS component has been deprecated, and planned to be removed in 4.4 (see CAMEL-20196).
+
+=== camel-kafka
+
+The header name for the `List<RecordMetadata>` metadata has changed from
+`org.apache.kafka.clients.producer.RecordMetadata` to `kafka.RECORD_META`,
+and the header constant from `KAFKA_RECORDMETA` to `KAFKA_RECORD_META`.
\ No newline at end of file
diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
index cfdcf26558b..20c82a9b412 100644
--- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
+++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
@@ -5338,11 +5338,10 @@ public interface KafkaEndpointBuilderFactory {
*
* Group: producer
*
- * @return the name of the header {@code
- * org.apache.kafka.clients.producer.RecordMetadata}.
+ * @return the name of the header {@code kafka.RECORD_META}.
*/
- public String orgApacheKafkaClientsProducerRecordmetadata() {
- return "org.apache.kafka.clients.producer.RecordMetadata";
+ public String kafkaRecordMeta() {
+ return "kafka.RECORD_META";
}
/**