You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/06/04 07:19:47 UTC

[camel] branch camel-3.14.x updated: [CAMEL-18159] SendDynamicAware of several components parses uri that starts with schema:// incorectly (#7713)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.14.x by this push:
     new f0f0f1bd742 [CAMEL-18159] SendDynamicAware of several components parses uri that starts with schema:// incorectly (#7713)
f0f0f1bd742 is described below

commit f0f0f1bd7428caa6f9cc2e6a8cb66ca6308ea9ce
Author: artemse <ar...@gmail.com>
AuthorDate: Sat Jun 4 10:18:55 2022 +0300

    [CAMEL-18159] SendDynamicAware of several components parses uri that starts with schema:// incorectly (#7713)
    
    * [CAMEL-18459] Fixes incorrect JmsSendDynamicAware parsing for destinations that starts with "schema://" and dose not have queue: or topic: prefix
    
    * [CAMEL-18459] redundant empty line removed
    
    * [CAMEL-18459] cleanup JmsToDSendDynamicTest, assertion on message body added
    
    * [CAMEL-18459] RabbitMQSendDynamicAware incorrectly parses exchange name if it starts with "schema://:
    
    * [CAMEL-18459] SpringRabbitMQSendDynamicAware incorrectly parses exchange name if it starts with "schema://:
    
    * [CAMEL-18459] incorrect schema in SpringRabbitMQSendDynamicAwareTest fixed
    
    * SjmsSendDynamicAware incorrectly parses destination if it starts with schema://
    
    * [CAMEL-18459] VertxKafkaSendDynamicAware incorrectly parses topic name if it starts with "schema://:
    
    * [CAMEL-18459] PahoSendDynamicAware incorrectly parses topic name if it starts with "schema://:
    
    * [CAMEL-18459] remove file added by mistake
    
    * [CAMEL-18159] missing license info added
    
    * [CAMEL-18159] PahoMqttSendDynamicAware incorrectly parses topic name if it starts with "schema://:
---
 .../camel/component/jms/JmsToDSendDynamicTest.java |  6 +--
 .../paho/mqtt5/PahoMqtt5SendDynamicAware.java      |  1 +
 .../paho/mqtt5/PahoMqtt5SendDynamicAwareTest.java  | 56 +++++++++++++++++++++
 .../camel/component/paho/PahoSendDynamicAware.java |  1 +
 .../component/paho/PahoToDSendDynamicTest.java     |  7 +++
 .../rabbitmq/RabbitMQSendDynamicAware.java         |  1 +
 .../rabbitmq/RabbitMQSendDynamicAwareTest.java     | 57 ++++++++++++++++++++++
 .../camel/component/sjms/SjmsSendDynamicAware.java |  1 +
 .../component/sjms/SjmsSendDynamicAwareTest.java   | 57 ++++++++++++++++++++++
 .../SpringRabbitMQSendDynamicAware.java            |  1 +
 .../SpringRabbitMQSendDynamicAwareTest.java        | 57 ++++++++++++++++++++++
 .../vertx/kafka/VertxKafkaSendDynamicAware.java    |  1 +
 .../kafka/VertxKafkaSendDynamicAwareTest.java      | 57 ++++++++++++++++++++++
 13 files changed, 299 insertions(+), 4 deletions(-)

diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsToDSendDynamicTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsToDSendDynamicTest.java
index cd227b1b471..4778953932d 100644
--- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsToDSendDynamicTest.java
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsToDSendDynamicTest.java
@@ -19,7 +19,6 @@ package org.apache.camel.component.jms;
 import javax.jms.ConnectionFactory;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.test.junit5.CamelTestSupport;
 import org.junit.jupiter.api.Test;
@@ -47,9 +46,8 @@ public class JmsToDSendDynamicTest extends CamelTestSupport {
     @Test
     public void testToDSlashed() {
         template.sendBodyAndHeader("direct:startSlashed", "Hello bar", "where", "bar");
-
-        Exchange exchange = consumer.receive("activemq://bar", 2000);
-        exchange.getMessage().getHeader(JmsConstants.JMS_DESTINATION_NAME);
+        String out = consumer.receiveBody("activemq://bar", 2000, String.class);
+        assertEquals("Hello bar", out);
     }
 
     @Override
diff --git a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5SendDynamicAware.java b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5SendDynamicAware.java
index caae4b75aa1..14dab516ad9 100644
--- a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5SendDynamicAware.java
+++ b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5SendDynamicAware.java
@@ -104,6 +104,7 @@ public class PahoMqtt5SendDynamicAware extends ServiceSupport implements SendDyn
 
     private String parseTopicName(String uri) {
         // strip query
+        uri = uri.replaceFirst(scheme + "://", ":");
         int pos = uri.indexOf('?');
         if (pos != -1) {
             uri = uri.substring(0, pos);
diff --git a/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5SendDynamicAwareTest.java b/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5SendDynamicAwareTest.java
new file mode 100644
index 00000000000..63b0806d4fe
--- /dev/null
+++ b/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5SendDynamicAwareTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.paho.mqtt5;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.SendDynamicAware;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class PahoMqtt5SendDynamicAwareTest extends CamelTestSupport {
+    PahoMqtt5SendDynamicAware pahoMqtt5SendDynamicAware;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        super.setUp();
+        this.pahoMqtt5SendDynamicAware = new PahoMqtt5SendDynamicAware();
+    }
+
+    @Test
+    public void testUriParsing() throws Exception {
+        this.pahoMqtt5SendDynamicAware.setScheme("paho-mqtt5");
+        Exchange exchange = createExchangeWithBody("The Body");
+        SendDynamicAware.DynamicAwareEntry entry = new SendDynamicAware.DynamicAwareEntry("paho-mqtt5:destination", "paho-mqtt5:${header.test}", null, null);
+        Processor processor = this.pahoMqtt5SendDynamicAware.createPreProcessor(createExchangeWithBody("Body"), entry);
+        processor.process(exchange);
+        assertEquals("destination", exchange.getMessage().getHeader(PahoMqtt5Constants.CAMEL_PAHO_OVERRIDE_TOPIC));
+    }
+
+    @Test
+    public void testSlashedUriParsing() throws Exception {
+        this.pahoMqtt5SendDynamicAware.setScheme("paho-mqtt5");
+        Exchange exchange = createExchangeWithBody("The Body");
+        SendDynamicAware.DynamicAwareEntry entry = new SendDynamicAware.DynamicAwareEntry("paho-mqtt5://destination", "paho-mqtt5://${header.test}", null, null);
+        Processor processor = this.pahoMqtt5SendDynamicAware.createPreProcessor(createExchangeWithBody("Body"), entry);
+        processor.process(exchange);
+        assertEquals("destination", exchange.getMessage().getHeader(PahoMqtt5Constants.CAMEL_PAHO_OVERRIDE_TOPIC));
+    }
+}
\ No newline at end of file
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoSendDynamicAware.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoSendDynamicAware.java
index 31555c40870..a55500ed784 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoSendDynamicAware.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoSendDynamicAware.java
@@ -104,6 +104,7 @@ public class PahoSendDynamicAware extends ServiceSupport implements SendDynamicA
 
     private String parseTopicName(String uri) {
         // strip query
+        uri = uri.replaceFirst(scheme + "://", ":");
         int pos = uri.indexOf('?');
         if (pos != -1) {
             uri = uri.substring(0, pos);
diff --git a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoToDSendDynamicTest.java b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoToDSendDynamicTest.java
index 34ac31fd69d..41cbba0ac9e 100644
--- a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoToDSendDynamicTest.java
+++ b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoToDSendDynamicTest.java
@@ -57,6 +57,12 @@ public class PahoToDSendDynamicTest extends CamelTestSupport {
         out = consumer.receiveBody("paho:beer", 2000, String.class);
         assertEquals("Hello beer", out);
     }
+    @Test
+    public void testToDSlashed() {
+        template.sendBodyAndHeader("direct:startSlashed", "Hello bar", "where", "bar");
+        String out = consumer.receiveBody("paho://bar", 2000, String.class);
+        assertEquals("Hello bar", out);
+    }
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
@@ -68,6 +74,7 @@ public class PahoToDSendDynamicTest extends CamelTestSupport {
 
                 // route message dynamic using toD
                 from("direct:start").toD("paho:${header.where}?retained=true");
+                from("direct:startSlashed").toD("paho://${header.where}?retained=true");
             }
         };
     }
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQSendDynamicAware.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQSendDynamicAware.java
index 93cdc63846d..35ac6758221 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQSendDynamicAware.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQSendDynamicAware.java
@@ -104,6 +104,7 @@ public class RabbitMQSendDynamicAware extends ServiceSupport implements SendDyna
 
     private String parseExchangeName(String uri) {
         // strip query
+        uri = uri.replaceFirst(scheme + "://", ":");
         int pos = uri.indexOf('?');
         if (pos != -1) {
             uri = uri.substring(0, pos);
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSendDynamicAwareTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSendDynamicAwareTest.java
new file mode 100644
index 00000000000..670d268e849
--- /dev/null
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSendDynamicAwareTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.SendDynamicAware;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class RabbitMQSendDynamicAwareTest extends CamelTestSupport {
+
+    RabbitMQSendDynamicAware rabbitMQSendDynamicAware;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        super.setUp();
+        this.rabbitMQSendDynamicAware = new RabbitMQSendDynamicAware();
+    }
+
+    @Test
+    public void testUriParsing() throws Exception {
+        this.rabbitMQSendDynamicAware.setScheme("rabbitmq");
+        Exchange exchange = createExchangeWithBody("The Body");
+        SendDynamicAware.DynamicAwareEntry entry = new SendDynamicAware.DynamicAwareEntry("rabbitmq:destination", "rabbitmq:${header.test}", null, null);
+        Processor processor = this.rabbitMQSendDynamicAware.createPreProcessor(createExchangeWithBody("Body"), entry);
+        processor.process(exchange);
+        assertEquals("destination", exchange.getMessage().getHeader(RabbitMQConstants.EXCHANGE_OVERRIDE_NAME));
+    }
+
+    @Test
+    public void testSlashedUriParsing() throws Exception {
+        this.rabbitMQSendDynamicAware.setScheme("rabbitmq");
+        Exchange exchange = createExchangeWithBody("The Body");
+        SendDynamicAware.DynamicAwareEntry entry = new SendDynamicAware.DynamicAwareEntry("rabbitmq://destination", "rabbitmq://${header.test}", null, null);
+        Processor processor = this.rabbitMQSendDynamicAware.createPreProcessor(createExchangeWithBody("Body"), entry);
+        processor.process(exchange);
+        assertEquals("destination", exchange.getMessage().getHeader(RabbitMQConstants.EXCHANGE_OVERRIDE_NAME));
+    }
+}
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsSendDynamicAware.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsSendDynamicAware.java
index fd3697c72fa..81d99f5dc85 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsSendDynamicAware.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsSendDynamicAware.java
@@ -104,6 +104,7 @@ public class SjmsSendDynamicAware extends ServiceSupport implements SendDynamicA
 
     private String parseDestinationName(String uri) {
         // strip query
+        uri = uri.replaceFirst(scheme + "://", ":");
         int pos = uri.indexOf('?');
         if (pos != -1) {
             uri = uri.substring(0, pos);
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsSendDynamicAwareTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsSendDynamicAwareTest.java
new file mode 100644
index 00000000000..a68489c8924
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsSendDynamicAwareTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.SendDynamicAware;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class SjmsSendDynamicAwareTest extends CamelTestSupport {
+
+    SjmsSendDynamicAware sjmsSendDynamicAware;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        super.setUp();
+        this.sjmsSendDynamicAware = new SjmsSendDynamicAware();
+    }
+
+    @Test
+    public void testUriParsing() throws Exception {
+        this.sjmsSendDynamicAware.setScheme("sjms");
+        Exchange exchange = createExchangeWithBody("The Body");
+        SendDynamicAware.DynamicAwareEntry entry = new SendDynamicAware.DynamicAwareEntry("sjms:destination", "sjms:${header.test}", null, null);
+        Processor processor = this.sjmsSendDynamicAware.createPreProcessor(createExchangeWithBody("Body"), entry);
+        processor.process(exchange);
+        assertEquals("destination", exchange.getMessage().getHeader(SjmsConstants.JMS_DESTINATION_NAME));
+    }
+
+    @Test
+    public void testSlashedUriParsing() throws Exception {
+        this.sjmsSendDynamicAware.setScheme("sjms");
+        Exchange exchange = createExchangeWithBody("The Body");
+        SendDynamicAware.DynamicAwareEntry entry = new SendDynamicAware.DynamicAwareEntry("sjms://destination", "sjms://${header.test}", null, null);
+        Processor processor = this.sjmsSendDynamicAware.createPreProcessor(createExchangeWithBody("Body"), entry);
+        processor.process(exchange);
+        assertEquals("destination", exchange.getMessage().getHeader(SjmsConstants.JMS_DESTINATION_NAME));
+    }
+}
diff --git a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQSendDynamicAware.java b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQSendDynamicAware.java
index 9bc330a2c88..0b2eb662086 100644
--- a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQSendDynamicAware.java
+++ b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQSendDynamicAware.java
@@ -127,6 +127,7 @@ public class SpringRabbitMQSendDynamicAware extends ServiceSupport implements Se
 
     private String parseExchangeName(String uri) {
         // strip query
+        uri = uri.replaceFirst(scheme + "://", ":");
         int pos = uri.indexOf('?');
         if (pos != -1) {
             uri = uri.substring(0, pos);
diff --git a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/SpringRabbitMQSendDynamicAwareTest.java b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/SpringRabbitMQSendDynamicAwareTest.java
new file mode 100644
index 00000000000..1957ae60c78
--- /dev/null
+++ b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/SpringRabbitMQSendDynamicAwareTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.springrabbit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.SendDynamicAware;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class SpringRabbitMQSendDynamicAwareTest extends CamelTestSupport {
+
+    SpringRabbitMQSendDynamicAware springRabbitMQSendDynamicAware;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        super.setUp();
+        this.springRabbitMQSendDynamicAware = new SpringRabbitMQSendDynamicAware();
+    }
+
+    @Test
+    public void testUriParsing() throws Exception {
+        this.springRabbitMQSendDynamicAware.setScheme("spring-rabbitmq");
+        Exchange exchange = createExchangeWithBody("The Body");
+        SendDynamicAware.DynamicAwareEntry entry = new SendDynamicAware.DynamicAwareEntry("spring-rabbitmq:destination", "spring-rabbitmq:${header.test}", null, null);
+        Processor processor = this.springRabbitMQSendDynamicAware.createPreProcessor(createExchangeWithBody("Body"), entry);
+        processor.process(exchange);
+        assertEquals("destination", exchange.getMessage().getHeader(SpringRabbitMQConstants.EXCHANGE_OVERRIDE_NAME));
+    }
+
+    @Test
+    public void testSlashedUriParsing() throws Exception {
+        this.springRabbitMQSendDynamicAware.setScheme("spring-rabbitmq");
+        Exchange exchange = createExchangeWithBody("The Body");
+        SendDynamicAware.DynamicAwareEntry entry = new SendDynamicAware.DynamicAwareEntry("spring-rabbitmq://destination", "spring-rabbitmq://${header.test}", null, null);
+        Processor processor = this.springRabbitMQSendDynamicAware.createPreProcessor(createExchangeWithBody("Body"), entry);
+        processor.process(exchange);
+        assertEquals("destination", exchange.getMessage().getHeader(SpringRabbitMQConstants.EXCHANGE_OVERRIDE_NAME));
+    }
+}
diff --git a/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaSendDynamicAware.java b/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaSendDynamicAware.java
index db901d6b120..d0e5323c83c 100644
--- a/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaSendDynamicAware.java
+++ b/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaSendDynamicAware.java
@@ -104,6 +104,7 @@ public class VertxKafkaSendDynamicAware extends ServiceSupport implements SendDy
 
     private String parseTopicName(String uri) {
         // strip query
+        uri = uri.replaceFirst(scheme + "://", ":");
         int pos = uri.indexOf('?');
         if (pos != -1) {
             uri = uri.substring(0, pos);
diff --git a/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/test/java/org/apache/camel/component/vertx/kafka/VertxKafkaSendDynamicAwareTest.java b/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/test/java/org/apache/camel/component/vertx/kafka/VertxKafkaSendDynamicAwareTest.java
new file mode 100644
index 00000000000..fe8c6f417f8
--- /dev/null
+++ b/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/test/java/org/apache/camel/component/vertx/kafka/VertxKafkaSendDynamicAwareTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.kafka;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.SendDynamicAware;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class VertxKafkaSendDynamicAwareTest extends CamelTestSupport {
+
+    VertxKafkaSendDynamicAware vertxKafkaSendDynamicAware;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        super.setUp();
+        this.vertxKafkaSendDynamicAware = new VertxKafkaSendDynamicAware();
+    }
+
+    @Test
+    public void testUriParsing() throws Exception {
+        this.vertxKafkaSendDynamicAware.setScheme("vertx-kafka");
+        Exchange exchange = createExchangeWithBody("The Body");
+        SendDynamicAware.DynamicAwareEntry entry = new SendDynamicAware.DynamicAwareEntry("vertx-kafka:destination", "vertx-kafka:${header.test}", null, null);
+        Processor processor = this.vertxKafkaSendDynamicAware.createPreProcessor(createExchangeWithBody("Body"), entry);
+        processor.process(exchange);
+        assertEquals("destination", exchange.getMessage().getHeader(VertxKafkaConstants.OVERRIDE_TOPIC));
+    }
+
+    @Test
+    public void testSlashedUriParsing() throws Exception {
+        this.vertxKafkaSendDynamicAware.setScheme("vertx-kafka");
+        Exchange exchange = createExchangeWithBody("The Body");
+        SendDynamicAware.DynamicAwareEntry entry = new SendDynamicAware.DynamicAwareEntry("vertx-kafka://destination", "vertx-kafka://${header.test}", null, null);
+        Processor processor = this.vertxKafkaSendDynamicAware.createPreProcessor(createExchangeWithBody("Body"), entry);
+        processor.process(exchange);
+        assertEquals("destination", exchange.getMessage().getHeader(VertxKafkaConstants.OVERRIDE_TOPIC));
+    }
+}