You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ig...@apache.org on 2022/08/11 16:07:02 UTC

[camel] branch main updated: CAMEL-18386: camel-kafka: Support using TypeConverter to parse kafka.OVERRIDE_TOPIC (#8154)

This is an automated email from the ASF dual-hosted git repository.

igarashitm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 88a13424012 CAMEL-18386: camel-kafka: Support using TypeConverter to parse kafka.OVERRIDE_TOPIC (#8154)
88a13424012 is described below

commit 88a1342401260b8d446d2e4b240c97bbde352c2d
Author: Tomohisa Igarashi <tm...@gmail.com>
AuthorDate: Thu Aug 11 12:06:51 2022 -0400

    CAMEL-18386: camel-kafka: Support using TypeConverter to parse kafka.OVERRIDE_TOPIC (#8154)
---
 .../camel/component/kafka/KafkaProducer.java       |  7 +-
 .../kafka/clients/producer/KafkaProducerTest.java  | 80 ++++++++++++++++++++++
 2 files changed, 84 insertions(+), 3 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index bcdf40e406e..958bc9ced55 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -298,9 +298,10 @@ public class KafkaProducer extends DefaultAsyncProducer {
     private String evaluateTopic(Message message) {
         // must remove header so it's not propagated.
         Object overrideTopic = message.removeHeader(KafkaConstants.OVERRIDE_TOPIC);
-        if (overrideTopic != null) {
-            LOG.debug("Using override topic: {}", overrideTopic);
-            return overrideTopic.toString();
+        String overrideTopicString = endpoint.getCamelContext().getTypeConverter().tryConvertTo(String.class, overrideTopic);
+        if (overrideTopicString != null) {
+            LOG.debug("Using override topic: {}", overrideTopicString);
+            return overrideTopicString;
         }
 
         String topic = configuration.getTopic();
diff --git a/components/camel-kafka/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
new file mode 100644
index 00000000000..e21d631e40b
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+import java.util.List;
+
+import com.fasterxml.jackson.databind.node.TextNode;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.component.kafka.KafkaComponent;
+import org.apache.camel.component.kafka.KafkaEndpoint;
+import org.apache.camel.component.kafka.KafkaProducer;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.hamcrest.core.Is;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Spy;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class KafkaProducerTest {
+
+    private MockProducer kafkaProducer = new MockProducer<>(true, new StringSerializer(), new StringSerializer());
+    private KafkaProducer camelProducer;
+    @Spy
+    private Exchange exchange;
+    @Spy
+    private Message message;
+
+    @BeforeEach
+    public void init() throws Exception {
+        CamelContext context = new DefaultCamelContext();
+        KafkaComponent component = new KafkaComponent(context);
+        camelProducer = new KafkaProducer((KafkaEndpoint) component.createEndpoint("kafka:test"));
+        camelProducer.setKafkaProducer(kafkaProducer);
+        when(exchange.getIn()).thenReturn(message);
+        when(exchange.getMessage()).thenReturn(message);
+        when(exchange.getContext()).thenReturn(context);
+        when(message.getHeader("kafka.PARTITION_KEY", Integer.class)).thenReturn(0);
+        when(message.getHeader("kafka.KEY")).thenReturn("key");
+    }
+
+    @AfterEach
+    public void after() {
+        kafkaProducer.clear();
+    }
+
+    @Test
+    public void testSendOverrideTopic() throws Exception {
+        when(message.removeHeader("kafka.OVERRIDE_TOPIC")).thenReturn("overridden-topic");
+        camelProducer.process(exchange);
+        when(message.removeHeader("kafka.OVERRIDE_TOPIC")).thenReturn(new TextNode("overridden-topic-jackson"));
+        camelProducer.process(exchange);
+        List<ProducerRecord<Object, Object>> records = kafkaProducer.history();
+        assertThat(records.get(0).topic(), Is.is("overridden-topic"));
+        assertThat(records.get(1).topic(), Is.is("overridden-topic-jackson"));
+    }
+
+}