You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by fm...@apache.org on 2023/07/18 11:14:24 UTC

[camel] branch main updated: CAMEL-19610: Upgrade to RocketMQ v5

This is an automated email from the ASF dual-hosted git repository.

fmariani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 99b01346845 CAMEL-19610: Upgrade to RocketMQ v5
99b01346845 is described below

commit 99b0134684548c775c27ba1870084d437c7ce15b
Author: Croway <fe...@gmail.com>
AuthorDate: Tue Jul 18 12:07:26 2023 +0200

    CAMEL-19610: Upgrade to RocketMQ v5
---
 components/camel-rocketmq/pom.xml                  |  31 +-----
 .../rocketmq/RocketMQRequestReplyRouteTest.java    |  39 ++++---
 .../component/rocketmq/RocketMQRouteTest.java      |  30 +++---
 .../rocketmq/infra/EmbeddedRocketMQServer.java     | 101 -------------------
 parent/pom.xml                                     |   2 +-
 pom.xml                                            |   1 +
 test-infra/camel-test-infra-rocketmq/pom.xml       |  54 ++++++++++
 .../src/main/resources/META-INF/MANIFEST.MF        |   0
 .../infra/rocketmq/common/RocketMQProperties.java  |  31 ++++++
 .../rocketmq/services/RocketMQBrokerContainer.java |  49 +++++++++
 .../infra/rocketmq/services/RocketMQContainer.java | 112 +++++++++++++++++++++
 .../services/RocketMQNameserverContainer.java      |  39 +++++++
 .../infra/rocketmq/services/RocketMQService.java   |  35 +++++++
 .../rocketmq/services/RocketMQServiceFactory.java  |  35 +++++++
 .../src/test/resources/broker1/broker1.conf        |  24 +++++
 .../src/test/resources/broker2/broker2.conf        |  24 +++++
 test-infra/pom.xml                                 |   1 +
 17 files changed, 440 insertions(+), 168 deletions(-)

diff --git a/components/camel-rocketmq/pom.xml b/components/camel-rocketmq/pom.xml
index 695f40801d1..8aa781e868e 100644
--- a/components/camel-rocketmq/pom.xml
+++ b/components/camel-rocketmq/pom.xml
@@ -32,10 +32,6 @@
     <name>Camel :: RocketMQ</name>
     <description>Camel RocketMQ Component</description>
 
-    <properties>
-        <camel.surefire.fork.additional-vmargs>--add-opens java.base/java.nio=ALL-UNNAMED --add-opens java.base/jdk.internal.ref=ALL-UNNAMED</camel.surefire.fork.additional-vmargs>
-    </properties>
-
     <dependencies>
         <dependency>
             <groupId>org.apache.camel</groupId>
@@ -54,31 +50,14 @@
 
         <dependency>
             <groupId>org.apache.camel</groupId>
-            <artifactId>camel-test-junit5</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-namesrv</artifactId>
-            <version>${rocketmq-version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-broker</artifactId>
-            <version>${rocketmq-version}</version>
+            <artifactId>camel-test-infra-rocketmq</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-test</artifactId>
-            <version>${rocketmq-version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>junit</groupId>
-                    <artifactId>junit</artifactId>
-                </exclusion>
-            </exclusions>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test-junit5</artifactId>
             <scope>test</scope>
         </dependency>
     </dependencies>
diff --git a/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRequestReplyRouteTest.java b/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRequestReplyRouteTest.java
index 78f801834ae..8d0650b1517 100644
--- a/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRequestReplyRouteTest.java
+++ b/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRequestReplyRouteTest.java
@@ -17,34 +17,31 @@
 
 package org.apache.camel.component.rocketmq;
 
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.component.rocketmq.infra.EmbeddedRocketMQServer;
+import org.apache.camel.test.infra.rocketmq.services.RocketMQService;
+import org.apache.camel.test.infra.rocketmq.services.RocketMQServiceFactory;
 import org.apache.camel.test.junit5.CamelTestSupport;
-import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.namesrv.NamesrvController;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 public class RocketMQRequestReplyRouteTest extends CamelTestSupport {
 
-    private static final int NAMESRV_PORT = 59877;
-
-    private static final String NAMESRV_ADDR = "127.0.0.1:" + NAMESRV_PORT;
-
     private static final String START_ENDPOINT_URI = "rocketmq:START_TOPIC?producerGroup=p1&consumerGroup=c1";
 
     private static final String INTERMEDIATE_ENDPOINT_URI = "rocketmq:INTERMEDIATE_TOPIC" +
@@ -58,23 +55,20 @@ public class RocketMQRequestReplyRouteTest extends CamelTestSupport {
 
     private static final String EXPECTED_MESSAGE = "Hi.";
 
-    private static NamesrvController namesrvController;
-
-    private static BrokerController brokerController;
-
     private MockEndpoint resultEndpoint;
 
     private DefaultMQPushConsumer replierConsumer;
 
     private DefaultMQProducer replierProducer;
 
+    @RegisterExtension
+    public static RocketMQService rocketMQService = RocketMQServiceFactory.createService();
+
     @BeforeAll
     static void beforeAll() throws Exception {
-        namesrvController = EmbeddedRocketMQServer.createAndStartNamesrv(NAMESRV_PORT);
-        brokerController = EmbeddedRocketMQServer.createAndStartBroker(NAMESRV_ADDR);
-        EmbeddedRocketMQServer.createTopic(NAMESRV_ADDR, "DefaultCluster", "START_TOPIC");
-        EmbeddedRocketMQServer.createTopic(NAMESRV_ADDR, "DefaultCluster", "INTERMEDIATE_TOPIC");
-        EmbeddedRocketMQServer.createTopic(NAMESRV_ADDR, "DefaultCluster", "REPLY_TO_TOPIC");
+        rocketMQService.createTopic("START_TOPIC");
+        rocketMQService.createTopic("INTERMEDIATE_TOPIC");
+        rocketMQService.createTopic("REPLY_TO_TOPIC");
     }
 
     @Override
@@ -83,10 +77,10 @@ public class RocketMQRequestReplyRouteTest extends CamelTestSupport {
         super.setUp();
         resultEndpoint = (MockEndpoint) context.getEndpoint(RESULT_ENDPOINT_URI);
         replierProducer = new DefaultMQProducer("replierProducer");
-        replierProducer.setNamesrvAddr(NAMESRV_ADDR);
+        replierProducer.setNamesrvAddr(rocketMQService.nameserverAddress());
         replierProducer.start();
         replierConsumer = new DefaultMQPushConsumer("replierConsumer");
-        replierConsumer.setNamesrvAddr(NAMESRV_ADDR);
+        replierConsumer.setNamesrvAddr(rocketMQService.nameserverAddress());
         replierConsumer.subscribe("INTERMEDIATE_TOPIC", "*");
         replierConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, unused) -> {
             MessageExt messageExt = msgs.get(0);
@@ -106,7 +100,7 @@ public class RocketMQRequestReplyRouteTest extends CamelTestSupport {
     protected CamelContext createCamelContext() throws Exception {
         CamelContext camelContext = super.createCamelContext();
         RocketMQComponent rocketMQComponent = new RocketMQComponent();
-        rocketMQComponent.setNamesrvAddr(NAMESRV_ADDR);
+        rocketMQComponent.setNamesrvAddr(rocketMQService.nameserverAddress());
         camelContext.addComponent("rocketmq", rocketMQComponent);
         return camelContext;
     }
@@ -141,8 +135,9 @@ public class RocketMQRequestReplyRouteTest extends CamelTestSupport {
     }
 
     @AfterAll
-    public static void afterAll() {
-        brokerController.shutdown();
-        namesrvController.shutdown();
+    public static void afterAll() throws IOException, InterruptedException {
+        rocketMQService.deleteTopic("START_TOPIC");
+        rocketMQService.deleteTopic("INTERMEDIATE_TOPIC");
+        rocketMQService.deleteTopic("REPLY_TO_TOPIC");
     }
 }
diff --git a/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRouteTest.java b/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRouteTest.java
index 8deba43ad8d..d1eaed5c72e 100644
--- a/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRouteTest.java
+++ b/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRouteTest.java
@@ -17,41 +17,36 @@
 
 package org.apache.camel.component.rocketmq;
 
+import java.io.IOException;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.component.rocketmq.infra.EmbeddedRocketMQServer;
+import org.apache.camel.test.infra.rocketmq.services.RocketMQService;
+import org.apache.camel.test.infra.rocketmq.services.RocketMQServiceFactory;
 import org.apache.camel.test.junit5.CamelTestSupport;
-import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.namesrv.NamesrvController;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 public class RocketMQRouteTest extends CamelTestSupport {
 
     public static final String EXPECTED_MESSAGE = "hello, RocketMQ.";
 
-    private static final int NAMESRV_PORT = 59876;
-
-    private static final String NAMESRV_ADDR = "127.0.0.1:" + NAMESRV_PORT;
-
     private static final String START_ENDPOINT_URI = "rocketmq:START_TOPIC?producerGroup=p1&consumerGroup=c1&sendTag=startTag";
 
     private static final String RESULT_ENDPOINT_URI = "mock:result";
 
-    private static NamesrvController namesrvController;
-
-    private static BrokerController brokerController;
-
     private MockEndpoint resultEndpoint;
 
+    @RegisterExtension
+    public static RocketMQService rocketMQService = RocketMQServiceFactory.createService();
+
     @BeforeAll
     static void beforeAll() throws Exception {
-        namesrvController = EmbeddedRocketMQServer.createAndStartNamesrv(NAMESRV_PORT);
-        brokerController = EmbeddedRocketMQServer.createAndStartBroker(NAMESRV_ADDR);
-        EmbeddedRocketMQServer.createTopic(NAMESRV_ADDR, "DefaultCluster", "START_TOPIC");
+        rocketMQService.createTopic("START_TOPIC");
     }
 
     @Override
@@ -65,7 +60,7 @@ public class RocketMQRouteTest extends CamelTestSupport {
     protected CamelContext createCamelContext() throws Exception {
         CamelContext camelContext = super.createCamelContext();
         RocketMQComponent rocketMQComponent = new RocketMQComponent();
-        rocketMQComponent.setNamesrvAddr(NAMESRV_ADDR);
+        rocketMQComponent.setNamesrvAddr(rocketMQService.nameserverAddress());
         camelContext.addComponent("rocketmq", rocketMQComponent);
         return camelContext;
     }
@@ -93,8 +88,7 @@ public class RocketMQRouteTest extends CamelTestSupport {
     }
 
     @AfterAll
-    public static void afterAll() {
-        brokerController.shutdown();
-        namesrvController.shutdown();
+    public static void afterAll() throws IOException, InterruptedException {
+        rocketMQService.deleteTopic("START_TOPIC");
     }
 }
diff --git a/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/infra/EmbeddedRocketMQServer.java b/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/infra/EmbeddedRocketMQServer.java
deleted file mode 100644
index 153f6c4ca47..00000000000
--- a/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/infra/EmbeddedRocketMQServer.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.component.rocketmq.infra;
-
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.common.BrokerConfig;
-import org.apache.rocketmq.common.namesrv.NamesrvConfig;
-import org.apache.rocketmq.namesrv.NamesrvController;
-import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyServerConfig;
-import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.test.util.MQAdmin;
-import org.apache.rocketmq.test.util.TestUtils;
-
-public final class EmbeddedRocketMQServer {
-
-    private static final AtomicInteger BROKER_INDEX = new AtomicInteger();
-
-    private static final AtomicInteger BROKER_PORTS = new AtomicInteger(61000);
-
-    private EmbeddedRocketMQServer() {
-    }
-
-    public static NamesrvController createAndStartNamesrv(int port) throws Exception {
-        NettyServerConfig serverConfig = new NettyServerConfig();
-        serverConfig.setListenPort(port);
-        NamesrvController result = new NamesrvController(new NamesrvConfig(), serverConfig);
-        result.initialize();
-        result.start();
-        return result;
-    }
-
-    public static BrokerController createAndStartBroker(String nsAddr) throws Exception {
-        NettyServerConfig nettyServerConfig = new NettyServerConfig();
-        nettyServerConfig.setListenPort(BROKER_PORTS.getAndIncrement());
-        BrokerController result = new BrokerController(
-                prepareBrokerConfig(nsAddr), nettyServerConfig, new NettyClientConfig(), prepareMessageStoreConfig());
-        result.initialize();
-        result.start();
-        return result;
-    }
-
-    private static BrokerConfig prepareBrokerConfig(final String nsAddr) {
-        BrokerConfig brokerConfig = new BrokerConfig();
-        brokerConfig.setNamesrvAddr(nsAddr);
-        brokerConfig.setBrokerName("CamelRocketMQBroker" + BROKER_INDEX.getAndIncrement());
-        brokerConfig.setBrokerIP1("127.0.0.1");
-        brokerConfig.setSendMessageThreadPoolNums(1);
-        brokerConfig.setPutMessageFutureThreadPoolNums(1);
-        brokerConfig.setPullMessageThreadPoolNums(1);
-        brokerConfig.setProcessReplyMessageThreadPoolNums(1);
-        brokerConfig.setQueryMessageThreadPoolNums(1);
-        brokerConfig.setAdminBrokerThreadPoolNums(1);
-        brokerConfig.setClientManageThreadPoolNums(1);
-        brokerConfig.setConsumerManageThreadPoolNums(1);
-        brokerConfig.setHeartbeatThreadPoolNums(1);
-        return brokerConfig;
-    }
-
-    private static MessageStoreConfig prepareMessageStoreConfig() {
-        String baseDir = System.getProperty("java.io.tmpdir") + "/embedded_rocketmq/" + System.currentTimeMillis();
-        MessageStoreConfig storeConfig = new MessageStoreConfig();
-        storeConfig.setStorePathRootDir(baseDir);
-        storeConfig.setStorePathCommitLog(baseDir + "/commitlog");
-        storeConfig.setMappedFileSizeCommitLog(1024 * 1024 * 10);
-        storeConfig.setMaxIndexNum(100);
-        storeConfig.setMaxHashSlotNum(400);
-        storeConfig.setHaListenPort(BROKER_PORTS.getAndIncrement());
-        return storeConfig;
-    }
-
-    public static void createTopic(String namesrvAddr, String defaultCluster, String topic) throws TimeoutException {
-        long startTime = System.currentTimeMillis();
-        while (!MQAdmin.createTopic(namesrvAddr, defaultCluster, topic, 4, 3)) {
-            if (System.currentTimeMillis() - startTime > 30 * 1000) {
-                throw new TimeoutException(
-                        String.format("Failed to create topic [%s] after %d ms", topic,
-                                System.currentTimeMillis() - startTime));
-            }
-            TestUtils.waitForMoment(500);
-        }
-    }
-}
diff --git a/parent/pom.xml b/parent/pom.xml
index ce6e4fd75e2..2ba089114b4 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -415,7 +415,7 @@
         <rest-assured-version>5.3.1</rest-assured-version>
         <roaster-version>2.28.0.Final</roaster-version>
         <robotframework-version>4.1.2</robotframework-version>
-        <rocketmq-version>4.9.7</rocketmq-version>
+        <rocketmq-version>5.1.3</rocketmq-version>
         <rome-version>2.1.0</rome-version>
         <rssreader-version>3.4.5</rssreader-version>
         <rxjava2-version>2.2.21</rxjava2-version>
diff --git a/pom.xml b/pom.xml
index 68269a4c855..1b9a1ec7b7c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -343,6 +343,7 @@
                         </excludes>
                         <mapping>
                             <Adapter>CAMEL_PROPERTIES_STYLE</Adapter>
+                            <conf>SCRIPT_STYLE</conf>
                             <Dockerfile.jvm>SCRIPT_STYLE</Dockerfile.jvm>
                             <Dockerfile.legacy-jar>SCRIPT_STYLE</Dockerfile.legacy-jar>
                             <Dockerfile.native-micro>SCRIPT_STYLE</Dockerfile.native-micro>
diff --git a/test-infra/camel-test-infra-rocketmq/pom.xml b/test-infra/camel-test-infra-rocketmq/pom.xml
new file mode 100644
index 00000000000..696f098d2ee
--- /dev/null
+++ b/test-infra/camel-test-infra-rocketmq/pom.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>camel-test-infra-parent</artifactId>
+        <groupId>org.apache.camel</groupId>
+        <relativePath>../camel-test-infra-parent/pom.xml</relativePath>
+        <version>4.0.0-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>camel-test-infra-rocketmq</artifactId>
+    <name>Camel :: Test Infra :: RocketMQ</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test-infra-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <version>${awaitility-version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <version>${testcontainers-version}</version>
+        </dependency>
+    </dependencies>
+
+
+</project>
diff --git a/test-infra/camel-test-infra-rocketmq/src/main/resources/META-INF/MANIFEST.MF b/test-infra/camel-test-infra-rocketmq/src/main/resources/META-INF/MANIFEST.MF
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/common/RocketMQProperties.java b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/common/RocketMQProperties.java
new file mode 100644
index 00000000000..065d1040e5e
--- /dev/null
+++ b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/common/RocketMQProperties.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.test.infra.rocketmq.common;
+
+public final class RocketMQProperties {
+
+    public static final String ROCKETMQ_VERSION_PROPERTY = "itest.rocketmq.container.image.version";
+    public static final String ROCKETMQ_IMAGE_PROPERTY = "itest.rocketmq.container.image";
+    public static final int ROCKETMQ_NAMESRV_PORT = 9876;
+    public static final int ROCKETMQ_BROKER1_PORT = 10909;
+    public static final int ROCKETMQ_BROKER2_PORT = 10911;
+    public static final int ROCKETMQ_BROKER3_PORT = 10912;
+
+    private RocketMQProperties() {
+
+    }
+}
diff --git a/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQBrokerContainer.java b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQBrokerContainer.java
new file mode 100644
index 00000000000..a3b5f48a132
--- /dev/null
+++ b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQBrokerContainer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.test.infra.rocketmq.services;
+
+import java.util.Collections;
+
+import org.apache.camel.test.infra.rocketmq.common.RocketMQProperties;
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+public class RocketMQBrokerContainer extends GenericContainer<RocketMQBrokerContainer> {
+
+    public RocketMQBrokerContainer(Network network, String confName) {
+        super(RocketMQContainer.ROCKETMQ_IMAGE);
+
+        withNetwork(network);
+        withExposedPorts(RocketMQProperties.ROCKETMQ_BROKER3_PORT,
+                RocketMQProperties.ROCKETMQ_BROKER2_PORT,
+                RocketMQProperties.ROCKETMQ_BROKER1_PORT);
+        withEnv("NAMESRV_ADDR", "nameserver:9876");
+        withClasspathResourceMapping(confName + "/" + confName + ".conf",
+                "/opt/rocketmq-" + RocketMQContainer.ROCKETMQ_VERSION + "/conf/broker.conf",
+                BindMode.READ_WRITE);
+
+        withTmpFs(Collections.singletonMap("/home/rocketmq/store", "rw"));
+        withTmpFs(Collections.singletonMap("/home/rocketmq/logs", "rw"));
+        withCommand("sh", "mqbroker",
+                "-c", "/opt/rocketmq-" + RocketMQContainer.ROCKETMQ_VERSION + "/conf/broker.conf");
+
+        waitingFor(Wait.forListeningPort());
+        withCreateContainerCmdModifier(cmd -> cmd.withName(confName));
+    }
+}
diff --git a/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQContainer.java b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQContainer.java
new file mode 100644
index 00000000000..8850356e393
--- /dev/null
+++ b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQContainer.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.test.infra.rocketmq.services;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.test.infra.common.services.ContainerService;
+import org.apache.camel.test.infra.rocketmq.common.RocketMQProperties;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.Network;
+
+public class RocketMQContainer implements RocketMQService, ContainerService<RocketMQNameserverContainer> {
+    private static final Logger LOG = LoggerFactory.getLogger(RocketMQContainer.class);
+    public static final String ROCKETMQ_VERSION = System.getProperty(RocketMQProperties.ROCKETMQ_VERSION_PROPERTY,
+            "5.1.3");
+    public static final String ROCKETMQ_IMAGE = System.getProperty(RocketMQProperties.ROCKETMQ_IMAGE_PROPERTY,
+            "apache/rocketmq:" + ROCKETMQ_VERSION);
+
+    private final RocketMQNameserverContainer nameserverContainer;
+    private final RocketMQBrokerContainer brokerContainer1;
+    private final RocketMQBrokerContainer brokerContainer2;
+
+    public RocketMQContainer() {
+        Network network = Network.newNetwork();
+
+        nameserverContainer = new RocketMQNameserverContainer(network);
+
+        brokerContainer1 = new RocketMQBrokerContainer(network, "broker1");
+        brokerContainer2 = new RocketMQBrokerContainer(network, "broker2");
+    }
+
+    @Override
+    public RocketMQNameserverContainer getContainer() {
+        return nameserverContainer;
+    }
+
+    @Override
+    public void registerProperties() {
+
+    }
+
+    @Override
+    public void initialize() {
+        nameserverContainer.start();
+        LOG.info("Apache RocketMQ running at address {}", nameserverAddress());
+
+        brokerContainer1.start();
+        brokerContainer2.start();
+    }
+
+    @Override
+    public void shutdown() {
+        nameserverContainer.stop();
+        brokerContainer1.stop();
+        brokerContainer2.stop();
+    }
+
+    @Override
+    public void afterTestExecution(ExtensionContext extensionContext) throws Exception {
+
+    }
+
+    @Override
+    public void beforeTestExecution(ExtensionContext extensionContext) throws Exception {
+
+    }
+
+    public void createTopic(String topic) {
+        Awaitility.await()
+                .atMost(20, TimeUnit.SECONDS)
+                .pollDelay(100, TimeUnit.MILLISECONDS).until(() -> {
+                    Container.ExecResult execResult = brokerContainer1.execInContainer(
+                            "sh", "mqadmin", "updateTopic", "-n", "nameserver:9876", "-t",
+                            topic, "-c", "DefaultCluster");
+
+                    LOG.info(execResult.getExitCode() + " " + execResult.getStderr() + " " + execResult.getStdout());
+
+                    return execResult.getStdout() != null && execResult.getStdout().contains("success");
+                });
+    }
+
+    public void deleteTopic(String topic) throws IOException, InterruptedException {
+        brokerContainer1.execInContainer(
+                "sh", "mqadmin", "deleteTopic", "-n", "nameserver:9876", "-t",
+                topic);
+    }
+
+    @Override
+    public String nameserverAddress() {
+        return nameserverContainer.getHost() + ":"
+               + nameserverContainer.getMappedPort(RocketMQProperties.ROCKETMQ_NAMESRV_PORT);
+    }
+}
diff --git a/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQNameserverContainer.java b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQNameserverContainer.java
new file mode 100644
index 00000000000..fb3e57d5256
--- /dev/null
+++ b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQNameserverContainer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.test.infra.rocketmq.services;
+
+import java.util.Collections;
+
+import org.apache.camel.test.infra.rocketmq.common.RocketMQProperties;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+public class RocketMQNameserverContainer extends GenericContainer<RocketMQNameserverContainer> {
+    public RocketMQNameserverContainer(Network network) {
+        super(RocketMQContainer.ROCKETMQ_IMAGE);
+
+        withNetwork(network);
+        withNetworkAliases("nameserver");
+        addExposedPort(RocketMQProperties.ROCKETMQ_NAMESRV_PORT);
+        withTmpFs(Collections.singletonMap("/home/rocketmq/logs", "rw"));
+        withCommand("sh", "mqnamesrv");
+        withCreateContainerCmdModifier(cmd -> cmd.withName("nameserver"));
+
+        waitingFor(Wait.forListeningPort());
+    }
+}
diff --git a/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQService.java b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQService.java
new file mode 100644
index 00000000000..0334269d392
--- /dev/null
+++ b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQService.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.test.infra.rocketmq.services;
+
+import java.io.IOException;
+
+import org.apache.camel.test.infra.common.services.TestService;
+import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
+import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
+
+public interface RocketMQService extends TestService, BeforeTestExecutionCallback, AfterTestExecutionCallback {
+    String nameserverAddress();
+
+    default String defaultCluster() {
+        return "DefaultCluster";
+    }
+
+    void createTopic(String topic);
+
+    void deleteTopic(String topic) throws IOException, InterruptedException;
+}
diff --git a/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQServiceFactory.java b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQServiceFactory.java
new file mode 100644
index 00000000000..58124d9f73e
--- /dev/null
+++ b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQServiceFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.test.infra.rocketmq.services;
+
+import org.apache.camel.test.infra.common.services.SimpleTestServiceBuilder;
+
+public final class RocketMQServiceFactory {
+    private RocketMQServiceFactory() {
+
+    }
+
+    public static SimpleTestServiceBuilder<RocketMQService> builder() {
+        return new SimpleTestServiceBuilder<>("rocketmq");
+    }
+
+    public static RocketMQService createService() {
+        return builder()
+                .addLocalMapping(RocketMQContainer::new)
+                .build();
+    }
+}
diff --git a/test-infra/camel-test-infra-rocketmq/src/test/resources/broker1/broker1.conf b/test-infra/camel-test-infra-rocketmq/src/test/resources/broker1/broker1.conf
new file mode 100644
index 00000000000..4e24028a2af
--- /dev/null
+++ b/test-infra/camel-test-infra-rocketmq/src/test/resources/broker1/broker1.conf
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+brokerClusterName = DefaultCluster
+brokerName = broker-a
+brokerId = 0
+deleteWhen = 04
+fileReservedTime = 48
+brokerRole = ASYNC_MASTER
+flushDiskType = ASYNC_FLUSH
\ No newline at end of file
diff --git a/test-infra/camel-test-infra-rocketmq/src/test/resources/broker2/broker2.conf b/test-infra/camel-test-infra-rocketmq/src/test/resources/broker2/broker2.conf
new file mode 100644
index 00000000000..66b7fde12ba
--- /dev/null
+++ b/test-infra/camel-test-infra-rocketmq/src/test/resources/broker2/broker2.conf
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+brokerClusterName = DefaultCluster
+brokerName = broker-b
+brokerId = 0
+deleteWhen = 04
+fileReservedTime = 48
+brokerRole = ASYNC_MASTER
+flushDiskType = ASYNC_FLUSH
\ No newline at end of file
diff --git a/test-infra/pom.xml b/test-infra/pom.xml
index 5a0627502bd..f27953aa22f 100644
--- a/test-infra/pom.xml
+++ b/test-infra/pom.xml
@@ -60,6 +60,7 @@
         <module>camel-test-infra-nats</module>
         <module>camel-test-infra-pulsar</module>
         <module>camel-test-infra-redis</module>
+        <module>camel-test-infra-rocketmq</module>
         <module>camel-test-infra-xmpp</module>
         <module>camel-test-infra-zookeeper</module>
         <module>camel-test-infra-postgres</module>