You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by fm...@apache.org on 2023/07/18 11:14:24 UTC
[camel] branch main updated: CAMEL-19610: Upgrade to RocketMQ v5
This is an automated email from the ASF dual-hosted git repository.
fmariani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 99b01346845 CAMEL-19610: Upgrade to RocketMQ v5
99b01346845 is described below
commit 99b0134684548c775c27ba1870084d437c7ce15b
Author: Croway <fe...@gmail.com>
AuthorDate: Tue Jul 18 12:07:26 2023 +0200
CAMEL-19610: Upgrade to RocketMQ v5
---
components/camel-rocketmq/pom.xml | 31 +-----
.../rocketmq/RocketMQRequestReplyRouteTest.java | 39 ++++---
.../component/rocketmq/RocketMQRouteTest.java | 30 +++---
.../rocketmq/infra/EmbeddedRocketMQServer.java | 101 -------------------
parent/pom.xml | 2 +-
pom.xml | 1 +
test-infra/camel-test-infra-rocketmq/pom.xml | 54 ++++++++++
.../src/main/resources/META-INF/MANIFEST.MF | 0
.../infra/rocketmq/common/RocketMQProperties.java | 31 ++++++
.../rocketmq/services/RocketMQBrokerContainer.java | 49 +++++++++
.../infra/rocketmq/services/RocketMQContainer.java | 112 +++++++++++++++++++++
.../services/RocketMQNameserverContainer.java | 39 +++++++
.../infra/rocketmq/services/RocketMQService.java | 35 +++++++
.../rocketmq/services/RocketMQServiceFactory.java | 35 +++++++
.../src/test/resources/broker1/broker1.conf | 24 +++++
.../src/test/resources/broker2/broker2.conf | 24 +++++
test-infra/pom.xml | 1 +
17 files changed, 440 insertions(+), 168 deletions(-)
diff --git a/components/camel-rocketmq/pom.xml b/components/camel-rocketmq/pom.xml
index 695f40801d1..8aa781e868e 100644
--- a/components/camel-rocketmq/pom.xml
+++ b/components/camel-rocketmq/pom.xml
@@ -32,10 +32,6 @@
<name>Camel :: RocketMQ</name>
<description>Camel RocketMQ Component</description>
- <properties>
- <camel.surefire.fork.additional-vmargs>--add-opens java.base/java.nio=ALL-UNNAMED --add-opens java.base/jdk.internal.ref=ALL-UNNAMED</camel.surefire.fork.additional-vmargs>
- </properties>
-
<dependencies>
<dependency>
<groupId>org.apache.camel</groupId>
@@ -54,31 +50,14 @@
<dependency>
<groupId>org.apache.camel</groupId>
- <artifactId>camel-test-junit5</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-namesrv</artifactId>
- <version>${rocketmq-version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-broker</artifactId>
- <version>${rocketmq-version}</version>
+ <artifactId>camel-test-infra-rocketmq</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-test</artifactId>
- <version>${rocketmq-version}</version>
- <exclusions>
- <exclusion>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </exclusion>
- </exclusions>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test-junit5</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
diff --git a/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRequestReplyRouteTest.java b/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRequestReplyRouteTest.java
index 78f801834ae..8d0650b1517 100644
--- a/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRequestReplyRouteTest.java
+++ b/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRequestReplyRouteTest.java
@@ -17,34 +17,31 @@
package org.apache.camel.component.rocketmq;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.camel.CamelContext;
import org.apache.camel.ExchangePattern;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.component.rocketmq.infra.EmbeddedRocketMQServer;
+import org.apache.camel.test.infra.rocketmq.services.RocketMQService;
+import org.apache.camel.test.infra.rocketmq.services.RocketMQServiceFactory;
import org.apache.camel.test.junit5.CamelTestSupport;
-import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.namesrv.NamesrvController;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
public class RocketMQRequestReplyRouteTest extends CamelTestSupport {
- private static final int NAMESRV_PORT = 59877;
-
- private static final String NAMESRV_ADDR = "127.0.0.1:" + NAMESRV_PORT;
-
private static final String START_ENDPOINT_URI = "rocketmq:START_TOPIC?producerGroup=p1&consumerGroup=c1";
private static final String INTERMEDIATE_ENDPOINT_URI = "rocketmq:INTERMEDIATE_TOPIC" +
@@ -58,23 +55,20 @@ public class RocketMQRequestReplyRouteTest extends CamelTestSupport {
private static final String EXPECTED_MESSAGE = "Hi.";
- private static NamesrvController namesrvController;
-
- private static BrokerController brokerController;
-
private MockEndpoint resultEndpoint;
private DefaultMQPushConsumer replierConsumer;
private DefaultMQProducer replierProducer;
+ @RegisterExtension
+ public static RocketMQService rocketMQService = RocketMQServiceFactory.createService();
+
@BeforeAll
static void beforeAll() throws Exception {
- namesrvController = EmbeddedRocketMQServer.createAndStartNamesrv(NAMESRV_PORT);
- brokerController = EmbeddedRocketMQServer.createAndStartBroker(NAMESRV_ADDR);
- EmbeddedRocketMQServer.createTopic(NAMESRV_ADDR, "DefaultCluster", "START_TOPIC");
- EmbeddedRocketMQServer.createTopic(NAMESRV_ADDR, "DefaultCluster", "INTERMEDIATE_TOPIC");
- EmbeddedRocketMQServer.createTopic(NAMESRV_ADDR, "DefaultCluster", "REPLY_TO_TOPIC");
+ rocketMQService.createTopic("START_TOPIC");
+ rocketMQService.createTopic("INTERMEDIATE_TOPIC");
+ rocketMQService.createTopic("REPLY_TO_TOPIC");
}
@Override
@@ -83,10 +77,10 @@ public class RocketMQRequestReplyRouteTest extends CamelTestSupport {
super.setUp();
resultEndpoint = (MockEndpoint) context.getEndpoint(RESULT_ENDPOINT_URI);
replierProducer = new DefaultMQProducer("replierProducer");
- replierProducer.setNamesrvAddr(NAMESRV_ADDR);
+ replierProducer.setNamesrvAddr(rocketMQService.nameserverAddress());
replierProducer.start();
replierConsumer = new DefaultMQPushConsumer("replierConsumer");
- replierConsumer.setNamesrvAddr(NAMESRV_ADDR);
+ replierConsumer.setNamesrvAddr(rocketMQService.nameserverAddress());
replierConsumer.subscribe("INTERMEDIATE_TOPIC", "*");
replierConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, unused) -> {
MessageExt messageExt = msgs.get(0);
@@ -106,7 +100,7 @@ public class RocketMQRequestReplyRouteTest extends CamelTestSupport {
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();
RocketMQComponent rocketMQComponent = new RocketMQComponent();
- rocketMQComponent.setNamesrvAddr(NAMESRV_ADDR);
+ rocketMQComponent.setNamesrvAddr(rocketMQService.nameserverAddress());
camelContext.addComponent("rocketmq", rocketMQComponent);
return camelContext;
}
@@ -141,8 +135,9 @@ public class RocketMQRequestReplyRouteTest extends CamelTestSupport {
}
@AfterAll
- public static void afterAll() {
- brokerController.shutdown();
- namesrvController.shutdown();
+ public static void afterAll() throws IOException, InterruptedException {
+ rocketMQService.deleteTopic("START_TOPIC");
+ rocketMQService.deleteTopic("INTERMEDIATE_TOPIC");
+ rocketMQService.deleteTopic("REPLY_TO_TOPIC");
}
}
diff --git a/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRouteTest.java b/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRouteTest.java
index 8deba43ad8d..d1eaed5c72e 100644
--- a/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRouteTest.java
+++ b/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRouteTest.java
@@ -17,41 +17,36 @@
package org.apache.camel.component.rocketmq;
+import java.io.IOException;
+
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.component.rocketmq.infra.EmbeddedRocketMQServer;
+import org.apache.camel.test.infra.rocketmq.services.RocketMQService;
+import org.apache.camel.test.infra.rocketmq.services.RocketMQServiceFactory;
import org.apache.camel.test.junit5.CamelTestSupport;
-import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.namesrv.NamesrvController;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
public class RocketMQRouteTest extends CamelTestSupport {
public static final String EXPECTED_MESSAGE = "hello, RocketMQ.";
- private static final int NAMESRV_PORT = 59876;
-
- private static final String NAMESRV_ADDR = "127.0.0.1:" + NAMESRV_PORT;
-
private static final String START_ENDPOINT_URI = "rocketmq:START_TOPIC?producerGroup=p1&consumerGroup=c1&sendTag=startTag";
private static final String RESULT_ENDPOINT_URI = "mock:result";
- private static NamesrvController namesrvController;
-
- private static BrokerController brokerController;
-
private MockEndpoint resultEndpoint;
+ @RegisterExtension
+ public static RocketMQService rocketMQService = RocketMQServiceFactory.createService();
+
@BeforeAll
static void beforeAll() throws Exception {
- namesrvController = EmbeddedRocketMQServer.createAndStartNamesrv(NAMESRV_PORT);
- brokerController = EmbeddedRocketMQServer.createAndStartBroker(NAMESRV_ADDR);
- EmbeddedRocketMQServer.createTopic(NAMESRV_ADDR, "DefaultCluster", "START_TOPIC");
+ rocketMQService.createTopic("START_TOPIC");
}
@Override
@@ -65,7 +60,7 @@ public class RocketMQRouteTest extends CamelTestSupport {
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();
RocketMQComponent rocketMQComponent = new RocketMQComponent();
- rocketMQComponent.setNamesrvAddr(NAMESRV_ADDR);
+ rocketMQComponent.setNamesrvAddr(rocketMQService.nameserverAddress());
camelContext.addComponent("rocketmq", rocketMQComponent);
return camelContext;
}
@@ -93,8 +88,7 @@ public class RocketMQRouteTest extends CamelTestSupport {
}
@AfterAll
- public static void afterAll() {
- brokerController.shutdown();
- namesrvController.shutdown();
+ public static void afterAll() throws IOException, InterruptedException {
+ rocketMQService.deleteTopic("START_TOPIC");
}
}
diff --git a/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/infra/EmbeddedRocketMQServer.java b/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/infra/EmbeddedRocketMQServer.java
deleted file mode 100644
index 153f6c4ca47..00000000000
--- a/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/infra/EmbeddedRocketMQServer.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.component.rocketmq.infra;
-
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.common.BrokerConfig;
-import org.apache.rocketmq.common.namesrv.NamesrvConfig;
-import org.apache.rocketmq.namesrv.NamesrvController;
-import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyServerConfig;
-import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.test.util.MQAdmin;
-import org.apache.rocketmq.test.util.TestUtils;
-
-public final class EmbeddedRocketMQServer {
-
- private static final AtomicInteger BROKER_INDEX = new AtomicInteger();
-
- private static final AtomicInteger BROKER_PORTS = new AtomicInteger(61000);
-
- private EmbeddedRocketMQServer() {
- }
-
- public static NamesrvController createAndStartNamesrv(int port) throws Exception {
- NettyServerConfig serverConfig = new NettyServerConfig();
- serverConfig.setListenPort(port);
- NamesrvController result = new NamesrvController(new NamesrvConfig(), serverConfig);
- result.initialize();
- result.start();
- return result;
- }
-
- public static BrokerController createAndStartBroker(String nsAddr) throws Exception {
- NettyServerConfig nettyServerConfig = new NettyServerConfig();
- nettyServerConfig.setListenPort(BROKER_PORTS.getAndIncrement());
- BrokerController result = new BrokerController(
- prepareBrokerConfig(nsAddr), nettyServerConfig, new NettyClientConfig(), prepareMessageStoreConfig());
- result.initialize();
- result.start();
- return result;
- }
-
- private static BrokerConfig prepareBrokerConfig(final String nsAddr) {
- BrokerConfig brokerConfig = new BrokerConfig();
- brokerConfig.setNamesrvAddr(nsAddr);
- brokerConfig.setBrokerName("CamelRocketMQBroker" + BROKER_INDEX.getAndIncrement());
- brokerConfig.setBrokerIP1("127.0.0.1");
- brokerConfig.setSendMessageThreadPoolNums(1);
- brokerConfig.setPutMessageFutureThreadPoolNums(1);
- brokerConfig.setPullMessageThreadPoolNums(1);
- brokerConfig.setProcessReplyMessageThreadPoolNums(1);
- brokerConfig.setQueryMessageThreadPoolNums(1);
- brokerConfig.setAdminBrokerThreadPoolNums(1);
- brokerConfig.setClientManageThreadPoolNums(1);
- brokerConfig.setConsumerManageThreadPoolNums(1);
- brokerConfig.setHeartbeatThreadPoolNums(1);
- return brokerConfig;
- }
-
- private static MessageStoreConfig prepareMessageStoreConfig() {
- String baseDir = System.getProperty("java.io.tmpdir") + "/embedded_rocketmq/" + System.currentTimeMillis();
- MessageStoreConfig storeConfig = new MessageStoreConfig();
- storeConfig.setStorePathRootDir(baseDir);
- storeConfig.setStorePathCommitLog(baseDir + "/commitlog");
- storeConfig.setMappedFileSizeCommitLog(1024 * 1024 * 10);
- storeConfig.setMaxIndexNum(100);
- storeConfig.setMaxHashSlotNum(400);
- storeConfig.setHaListenPort(BROKER_PORTS.getAndIncrement());
- return storeConfig;
- }
-
- public static void createTopic(String namesrvAddr, String defaultCluster, String topic) throws TimeoutException {
- long startTime = System.currentTimeMillis();
- while (!MQAdmin.createTopic(namesrvAddr, defaultCluster, topic, 4, 3)) {
- if (System.currentTimeMillis() - startTime > 30 * 1000) {
- throw new TimeoutException(
- String.format("Failed to create topic [%s] after %d ms", topic,
- System.currentTimeMillis() - startTime));
- }
- TestUtils.waitForMoment(500);
- }
- }
-}
diff --git a/parent/pom.xml b/parent/pom.xml
index ce6e4fd75e2..2ba089114b4 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -415,7 +415,7 @@
<rest-assured-version>5.3.1</rest-assured-version>
<roaster-version>2.28.0.Final</roaster-version>
<robotframework-version>4.1.2</robotframework-version>
- <rocketmq-version>4.9.7</rocketmq-version>
+ <rocketmq-version>5.1.3</rocketmq-version>
<rome-version>2.1.0</rome-version>
<rssreader-version>3.4.5</rssreader-version>
<rxjava2-version>2.2.21</rxjava2-version>
diff --git a/pom.xml b/pom.xml
index 68269a4c855..1b9a1ec7b7c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -343,6 +343,7 @@
</excludes>
<mapping>
<Adapter>CAMEL_PROPERTIES_STYLE</Adapter>
+ <conf>SCRIPT_STYLE</conf>
<Dockerfile.jvm>SCRIPT_STYLE</Dockerfile.jvm>
<Dockerfile.legacy-jar>SCRIPT_STYLE</Dockerfile.legacy-jar>
<Dockerfile.native-micro>SCRIPT_STYLE</Dockerfile.native-micro>
diff --git a/test-infra/camel-test-infra-rocketmq/pom.xml b/test-infra/camel-test-infra-rocketmq/pom.xml
new file mode 100644
index 00000000000..696f098d2ee
--- /dev/null
+++ b/test-infra/camel-test-infra-rocketmq/pom.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>camel-test-infra-parent</artifactId>
+ <groupId>org.apache.camel</groupId>
+ <relativePath>../camel-test-infra-parent/pom.xml</relativePath>
+ <version>4.0.0-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>camel-test-infra-rocketmq</artifactId>
+ <name>Camel :: Test Infra :: RocketMQ</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test-infra-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <version>${awaitility-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <version>${testcontainers-version}</version>
+ </dependency>
+ </dependencies>
+
+
+</project>
diff --git a/test-infra/camel-test-infra-rocketmq/src/main/resources/META-INF/MANIFEST.MF b/test-infra/camel-test-infra-rocketmq/src/main/resources/META-INF/MANIFEST.MF
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/common/RocketMQProperties.java b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/common/RocketMQProperties.java
new file mode 100644
index 00000000000..065d1040e5e
--- /dev/null
+++ b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/common/RocketMQProperties.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.test.infra.rocketmq.common;
+
+public final class RocketMQProperties {
+
+ public static final String ROCKETMQ_VERSION_PROPERTY = "itest.rocketmq.container.image.version";
+ public static final String ROCKETMQ_IMAGE_PROPERTY = "itest.rocketmq.container.image";
+ public static final int ROCKETMQ_NAMESRV_PORT = 9876;
+ public static final int ROCKETMQ_BROKER1_PORT = 10909;
+ public static final int ROCKETMQ_BROKER2_PORT = 10911;
+ public static final int ROCKETMQ_BROKER3_PORT = 10912;
+
+ private RocketMQProperties() {
+
+ }
+}
diff --git a/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQBrokerContainer.java b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQBrokerContainer.java
new file mode 100644
index 00000000000..a3b5f48a132
--- /dev/null
+++ b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQBrokerContainer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.test.infra.rocketmq.services;
+
+import java.util.Collections;
+
+import org.apache.camel.test.infra.rocketmq.common.RocketMQProperties;
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+public class RocketMQBrokerContainer extends GenericContainer<RocketMQBrokerContainer> {
+
+ public RocketMQBrokerContainer(Network network, String confName) {
+ super(RocketMQContainer.ROCKETMQ_IMAGE);
+
+ withNetwork(network);
+ withExposedPorts(RocketMQProperties.ROCKETMQ_BROKER3_PORT,
+ RocketMQProperties.ROCKETMQ_BROKER2_PORT,
+ RocketMQProperties.ROCKETMQ_BROKER1_PORT);
+ withEnv("NAMESRV_ADDR", "nameserver:9876");
+ withClasspathResourceMapping(confName + "/" + confName + ".conf",
+ "/opt/rocketmq-" + RocketMQContainer.ROCKETMQ_VERSION + "/conf/broker.conf",
+ BindMode.READ_WRITE);
+
+ withTmpFs(Collections.singletonMap("/home/rocketmq/store", "rw"));
+ withTmpFs(Collections.singletonMap("/home/rocketmq/logs", "rw"));
+ withCommand("sh", "mqbroker",
+ "-c", "/opt/rocketmq-" + RocketMQContainer.ROCKETMQ_VERSION + "/conf/broker.conf");
+
+ waitingFor(Wait.forListeningPort());
+ withCreateContainerCmdModifier(cmd -> cmd.withName(confName));
+ }
+}
diff --git a/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQContainer.java b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQContainer.java
new file mode 100644
index 00000000000..8850356e393
--- /dev/null
+++ b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQContainer.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.test.infra.rocketmq.services;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.test.infra.common.services.ContainerService;
+import org.apache.camel.test.infra.rocketmq.common.RocketMQProperties;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.Network;
+
+public class RocketMQContainer implements RocketMQService, ContainerService<RocketMQNameserverContainer> {
+ private static final Logger LOG = LoggerFactory.getLogger(RocketMQContainer.class);
+ public static final String ROCKETMQ_VERSION = System.getProperty(RocketMQProperties.ROCKETMQ_VERSION_PROPERTY,
+ "5.1.3");
+ public static final String ROCKETMQ_IMAGE = System.getProperty(RocketMQProperties.ROCKETMQ_IMAGE_PROPERTY,
+ "apache/rocketmq:" + ROCKETMQ_VERSION);
+
+ private final RocketMQNameserverContainer nameserverContainer;
+ private final RocketMQBrokerContainer brokerContainer1;
+ private final RocketMQBrokerContainer brokerContainer2;
+
+ public RocketMQContainer() {
+ Network network = Network.newNetwork();
+
+ nameserverContainer = new RocketMQNameserverContainer(network);
+
+ brokerContainer1 = new RocketMQBrokerContainer(network, "broker1");
+ brokerContainer2 = new RocketMQBrokerContainer(network, "broker2");
+ }
+
+ @Override
+ public RocketMQNameserverContainer getContainer() {
+ return nameserverContainer;
+ }
+
+ @Override
+ public void registerProperties() {
+
+ }
+
+ @Override
+ public void initialize() {
+ nameserverContainer.start();
+ LOG.info("Apache RocketMQ running at address {}", nameserverAddress());
+
+ brokerContainer1.start();
+ brokerContainer2.start();
+ }
+
+ @Override
+ public void shutdown() {
+ nameserverContainer.stop();
+ brokerContainer1.stop();
+ brokerContainer2.stop();
+ }
+
+ @Override
+ public void afterTestExecution(ExtensionContext extensionContext) throws Exception {
+
+ }
+
+ @Override
+ public void beforeTestExecution(ExtensionContext extensionContext) throws Exception {
+
+ }
+
+ public void createTopic(String topic) {
+ Awaitility.await()
+ .atMost(20, TimeUnit.SECONDS)
+ .pollDelay(100, TimeUnit.MILLISECONDS).until(() -> {
+ Container.ExecResult execResult = brokerContainer1.execInContainer(
+ "sh", "mqadmin", "updateTopic", "-n", "nameserver:9876", "-t",
+ topic, "-c", "DefaultCluster");
+
+ LOG.info(execResult.getExitCode() + " " + execResult.getStderr() + " " + execResult.getStdout());
+
+ return execResult.getStdout() != null && execResult.getStdout().contains("success");
+ });
+ }
+
+ public void deleteTopic(String topic) throws IOException, InterruptedException {
+ brokerContainer1.execInContainer(
+ "sh", "mqadmin", "deleteTopic", "-n", "nameserver:9876", "-t",
+ topic);
+ }
+
+ @Override
+ public String nameserverAddress() {
+ return nameserverContainer.getHost() + ":"
+ + nameserverContainer.getMappedPort(RocketMQProperties.ROCKETMQ_NAMESRV_PORT);
+ }
+}
diff --git a/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQNameserverContainer.java b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQNameserverContainer.java
new file mode 100644
index 00000000000..fb3e57d5256
--- /dev/null
+++ b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQNameserverContainer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.test.infra.rocketmq.services;
+
+import java.util.Collections;
+
+import org.apache.camel.test.infra.rocketmq.common.RocketMQProperties;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+public class RocketMQNameserverContainer extends GenericContainer<RocketMQNameserverContainer> {
+ public RocketMQNameserverContainer(Network network) {
+ super(RocketMQContainer.ROCKETMQ_IMAGE);
+
+ withNetwork(network);
+ withNetworkAliases("nameserver");
+ addExposedPort(RocketMQProperties.ROCKETMQ_NAMESRV_PORT);
+ withTmpFs(Collections.singletonMap("/home/rocketmq/logs", "rw"));
+ withCommand("sh", "mqnamesrv");
+ withCreateContainerCmdModifier(cmd -> cmd.withName("nameserver"));
+
+ waitingFor(Wait.forListeningPort());
+ }
+}
diff --git a/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQService.java b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQService.java
new file mode 100644
index 00000000000..0334269d392
--- /dev/null
+++ b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQService.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.test.infra.rocketmq.services;
+
+import java.io.IOException;
+
+import org.apache.camel.test.infra.common.services.TestService;
+import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
+import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
+
+public interface RocketMQService extends TestService, BeforeTestExecutionCallback, AfterTestExecutionCallback {
+ String nameserverAddress();
+
+ default String defaultCluster() {
+ return "DefaultCluster";
+ }
+
+ void createTopic(String topic);
+
+ void deleteTopic(String topic) throws IOException, InterruptedException;
+}
diff --git a/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQServiceFactory.java b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQServiceFactory.java
new file mode 100644
index 00000000000..58124d9f73e
--- /dev/null
+++ b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQServiceFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.test.infra.rocketmq.services;
+
+import org.apache.camel.test.infra.common.services.SimpleTestServiceBuilder;
+
+public final class RocketMQServiceFactory {
+ private RocketMQServiceFactory() {
+
+ }
+
+ public static SimpleTestServiceBuilder<RocketMQService> builder() {
+ return new SimpleTestServiceBuilder<>("rocketmq");
+ }
+
+ public static RocketMQService createService() {
+ return builder()
+ .addLocalMapping(RocketMQContainer::new)
+ .build();
+ }
+}
diff --git a/test-infra/camel-test-infra-rocketmq/src/test/resources/broker1/broker1.conf b/test-infra/camel-test-infra-rocketmq/src/test/resources/broker1/broker1.conf
new file mode 100644
index 00000000000..4e24028a2af
--- /dev/null
+++ b/test-infra/camel-test-infra-rocketmq/src/test/resources/broker1/broker1.conf
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+brokerClusterName = DefaultCluster
+brokerName = broker-a
+brokerId = 0
+deleteWhen = 04
+fileReservedTime = 48
+brokerRole = ASYNC_MASTER
+flushDiskType = ASYNC_FLUSH
\ No newline at end of file
diff --git a/test-infra/camel-test-infra-rocketmq/src/test/resources/broker2/broker2.conf b/test-infra/camel-test-infra-rocketmq/src/test/resources/broker2/broker2.conf
new file mode 100644
index 00000000000..66b7fde12ba
--- /dev/null
+++ b/test-infra/camel-test-infra-rocketmq/src/test/resources/broker2/broker2.conf
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+brokerClusterName = DefaultCluster
+brokerName = broker-b
+brokerId = 0
+deleteWhen = 04
+fileReservedTime = 48
+brokerRole = ASYNC_MASTER
+flushDiskType = ASYNC_FLUSH
\ No newline at end of file
diff --git a/test-infra/pom.xml b/test-infra/pom.xml
index 5a0627502bd..f27953aa22f 100644
--- a/test-infra/pom.xml
+++ b/test-infra/pom.xml
@@ -60,6 +60,7 @@
<module>camel-test-infra-nats</module>
<module>camel-test-infra-pulsar</module>
<module>camel-test-infra-redis</module>
+ <module>camel-test-infra-rocketmq</module>
<module>camel-test-infra-xmpp</module>
<module>camel-test-infra-zookeeper</module>
<module>camel-test-infra-postgres</module>