You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2020/09/08 10:30:21 UTC

[camel-kafka-connector] 01/02: Added performance test infrastructure for RabbitMQ

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit e76a79244bb2ad0174cdb3ad7a3f39ab246d2a39
Author: Otavio R. Piske <an...@gmail.com>
AuthorDate: Fri Sep 4 07:32:46 2020 +0200

    Added performance test infrastructure for RabbitMQ
    
    This adds a reference implementation for checking the resource usage of
    the RabbitMQ component while iddle. The motivation for this is related
    to the github issue #414.
---
 docs/modules/ROOT/pages/testing.adoc               | 24 +++++++
 parent/pom.xml                                     |  7 ++
 tests/perf-tests-rabbitmq/pom.xml                  | 74 +++++++++++++++++++++
 .../rabbitmq/services/ConnectionProperties.java    | 25 +++++++
 .../services/RabbitMQLocalContainerService.java    | 67 +++++++++++++++++++
 .../rabbitmq/services/RabbitMQRemoteService.java   | 37 +++++++++++
 .../rabbitmq/services/RabbitMQService.java         | 54 +++++++++++++++
 .../rabbitmq/services/RabbitMQServiceFactory.java  | 45 +++++++++++++
 .../source/CamelRabbitMQPropertyFactory.java       | 76 ++++++++++++++++++++++
 .../source/RabbitMQSourcePerformanceITCase.java    | 72 ++++++++++++++++++++
 tests/pom.xml                                      |  1 +
 11 files changed, 482 insertions(+)

diff --git a/docs/modules/ROOT/pages/testing.adoc b/docs/modules/ROOT/pages/testing.adoc
index 9b59983..ce4de58 100644
--- a/docs/modules/ROOT/pages/testing.adoc
+++ b/docs/modules/ROOT/pages/testing.adoc
@@ -69,6 +69,8 @@ remote instances. At the moment, the following properties can be set for remote
 
 It is possible to use a properties file to set these. To do so use `-Dtest.properties=/path/to/file.properties`.
 
+=== Manual Tests
+
 A few manual tests can be enabled and executed with adequate configuration on the accounts and environments
 used by those services. This is very specific to the nature of each of those services, therefore please consult
 the comments on each of those test cases for the details related to their setup.
@@ -79,6 +81,28 @@ these flags along with others that are specific to that service:
 * it.test.salesforce.enable
 * it.test.slack.enable
 
+=== Performance Tests
+
+There is also a reference test implementation for checking the performance and resource usage in some situations.
+The current implementation, in the perf-tests-rabbitmq, can be run using:
+
+[source,bash]
+----
+mvn -DskipIntegrationTests=false -Dit.test.perf.enabled=true clean verify
+----
+
+Additional JVM settings, such as those to allow usage of the https://github.com/jvm-profiling-tools/async-profiler[Async Profiler]
+can be passed using the `jvm.user.settings` property. For example:
+
+[source,bash]
+----
+mvn -DskipIntegrationTests=false -Dit.test.perf.enabled=true -Djvm.user.settings="-agentpath:/path/to/asyncProfiler.so=start,file=/path/to/profile.svg" clean verify
+----
+
+The duration of the test can be adjusted with the `rabbitmq.test.duration`. This option takes the number of minutes for
+the test duration.
+
+
 [#writing-new-tests]
 == Writing New Tests
 
diff --git a/parent/pom.xml b/parent/pom.xml
index c423459..81ce35a 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -319,6 +319,13 @@
                 <version>${version.postgres}</version>
                 <scope>test</scope>
             </dependency>
+
+            <dependency>
+                <groupId>org.testcontainers</groupId>
+                <artifactId>rabbitmq</artifactId>
+                <version>${testcontainers-version}</version>
+                <scope>test</scope>
+            </dependency>
         </dependencies>
     </dependencyManagement>
     <build>
diff --git a/tests/perf-tests-rabbitmq/pom.xml b/tests/perf-tests-rabbitmq/pom.xml
new file mode 100644
index 0000000..9379e0a
--- /dev/null
+++ b/tests/perf-tests-rabbitmq/pom.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.camel.kafkaconnector</groupId>
+        <artifactId>itests-parent</artifactId>
+        <version>0.5.0-SNAPSHOT</version>
+        <relativePath>../itests-parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>perf-tests-rabbitmq</artifactId>
+    <name>Camel-Kafka-Connector :: Tests :: Performance :: RabbitMQ</name>
+
+    <properties>
+        <jmx.port>9010</jmx.port>
+        <rmi.server>localhost</rmi.server>
+        <jvm.user.settings></jvm.user.settings>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.kafkaconnector</groupId>
+            <artifactId>itests-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-rabbitmq</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>rabbitmq</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+                <configuration>
+                    <argLine>${common.failsafe.args} ${jvm.user.settings} -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.local.only=false -Djava.rmi.server.hostname=${rmi.server} -Dcom.sun.management.jmxremote.rmi.port=${jmx.port}</argLine>
+                    <skipTests>${skipIntegrationTests}</skipTests>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+
+</project>
\ No newline at end of file
diff --git a/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/ConnectionProperties.java b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/ConnectionProperties.java
new file mode 100644
index 0000000..15b7f8d
--- /dev/null
+++ b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/ConnectionProperties.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.rabbitmq.services;
+
+public interface ConnectionProperties {
+    String username();
+    String password();
+    String hostname();
+    int port();
+}
diff --git a/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQLocalContainerService.java b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQLocalContainerService.java
new file mode 100644
index 0000000..b6ae6d1
--- /dev/null
+++ b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQLocalContainerService.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.rabbitmq.services;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.RabbitMQContainer;
+
+public class RabbitMQLocalContainerService implements RabbitMQService {
+    private static final Logger LOG = LoggerFactory.getLogger(RabbitMQLocalContainerService.class);
+
+    private final RabbitMQContainer container = new RabbitMQContainer("rabbitmq:3.8-management");
+
+    public RabbitMQLocalContainerService() {
+        container.start();
+    }
+
+    @Override
+    public ConnectionProperties connectionProperties() {
+        return new ConnectionProperties() {
+            @Override
+            public String username() {
+                return container.getAdminUsername();
+            }
+
+            @Override
+            public String password() {
+                return container.getAdminPassword();
+            }
+
+            @Override
+            public String hostname() {
+                return container.getHost();
+            }
+
+            @Override
+            public int port() {
+                return container.getAmqpPort();
+            }
+        };
+    }
+
+    @Override
+    public void initialize() {
+        LOG.info("RabbitMQ container running on {}", container.getAmqpUrl());
+    }
+
+    @Override
+    public void shutdown() {
+        container.stop();
+    }
+}
diff --git a/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQRemoteService.java b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQRemoteService.java
new file mode 100644
index 0000000..b6957be
--- /dev/null
+++ b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQRemoteService.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.rabbitmq.services;
+
+public class RabbitMQRemoteService implements RabbitMQService {
+
+
+    @Override
+    public ConnectionProperties connectionProperties() {
+        return null;
+    }
+
+    @Override
+    public void initialize() {
+
+    }
+
+    @Override
+    public void shutdown() {
+
+    }
+}
diff --git a/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQService.java b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQService.java
new file mode 100644
index 0000000..94e12bd
--- /dev/null
+++ b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQService.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.rabbitmq.services;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public interface RabbitMQService extends BeforeAllCallback, AfterAllCallback {
+
+
+    /**
+     * The connection properties for the service
+     * @return
+     */
+    ConnectionProperties connectionProperties();
+
+
+    /**
+     * Perform any initialization necessary
+     */
+    void initialize();
+
+    /**
+     * Shuts down the service after the test has completed
+     */
+    void shutdown();
+
+
+    @Override
+    default void afterAll(ExtensionContext extensionContext) throws Exception {
+        shutdown();
+    }
+
+    @Override
+    default void beforeAll(ExtensionContext extensionContext) throws Exception {
+        initialize();
+    }
+}
diff --git a/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQServiceFactory.java b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQServiceFactory.java
new file mode 100644
index 0000000..50013dd
--- /dev/null
+++ b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQServiceFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.rabbitmq.services;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class RabbitMQServiceFactory {
+    private static final Logger LOG = LoggerFactory.getLogger(RabbitMQServiceFactory.class);
+
+    private RabbitMQServiceFactory() {
+
+    }
+
+    public static RabbitMQService createService() {
+        String instanceType = System.getProperty("rabbitmq.instance.type");
+
+        if (instanceType == null || instanceType.equals("local-rabbitmq-container")) {
+            return new RabbitMQLocalContainerService();
+        }
+
+        if (instanceType.equals("remote")) {
+            return new RabbitMQRemoteService();
+        }
+
+        LOG.error("rabbit-mq instance must be one of 'local-rabbitmq-container' or 'remote");
+        throw new UnsupportedOperationException(String.format("Invalid rabbitmq instance type: %s", instanceType));
+
+    }
+}
diff --git a/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/CamelRabbitMQPropertyFactory.java b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/CamelRabbitMQPropertyFactory.java
new file mode 100644
index 0000000..abd8ed6
--- /dev/null
+++ b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/CamelRabbitMQPropertyFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.rabbitmq.source;
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
+
+public class CamelRabbitMQPropertyFactory extends SourceConnectorPropertyFactory<CamelRabbitMQPropertyFactory> {
+    public CamelRabbitMQPropertyFactory withHostname(String value) {
+        return setProperty("camel.component.rabbitmq.hostname", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withPortNumber(int value) {
+        return setProperty("camel.component.rabbitmq.portNumber", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withUsername(String value) {
+        return setProperty("camel.component.rabbitmq.username", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withPassword(String value) {
+        return setProperty("camel.component.rabbitmq.password", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withExchangeName(String value) {
+        return setProperty("camel.source.path.exchangeName", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withExchangeType(String value) {
+        return setProperty("camel.source.endpoint.exchangeType", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withAutoDelete(boolean value) {
+        return setProperty("camel.source.endpoint.autoDelete", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withQueue(String value) {
+        return setProperty("camel.source.endpoint.queue", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withRoutingKey(String value) {
+        return setProperty("camel.source.endpoint.routingKey", value);
+    }
+
+    public EndpointUrlBuilder<CamelRabbitMQPropertyFactory> withUrl(String hostname, int port, String exchangeName) {
+        String sourceUrl = String.format("rabbitmq://%s:%d/%s", hostname, port, exchangeName);
+
+        return new EndpointUrlBuilder<>(this::withSourceUrl, sourceUrl);
+    }
+
+    public static CamelRabbitMQPropertyFactory basic() {
+        return new CamelRabbitMQPropertyFactory()
+                    .withTasksMax(1)
+                    .withName("CamelRabbitmqSourceConnector")
+                    .withConnectorClass("org.apache.camel.kafkaconnector.rabbitmq.CamelRabbitmqSourceConnector")
+                    .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                    .withValueConverterClass("org.apache.kafka.connect.converters.ByteArrayConverter");
+    
+    }
+
+}
diff --git a/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourcePerformanceITCase.java b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourcePerformanceITCase.java
new file mode 100644
index 0000000..d3b04ab
--- /dev/null
+++ b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourcePerformanceITCase.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.rabbitmq.source;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.rabbitmq.services.RabbitMQService;
+import org.apache.camel.kafkaconnector.rabbitmq.services.RabbitMQServiceFactory;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+@Testcontainers
+@EnabledIfSystemProperty(named = "it.test.perf.enabled", matches = "true")
+public class RabbitMQSourcePerformanceITCase extends AbstractKafkaTest {
+    @RegisterExtension
+    public static RabbitMQService service = RabbitMQServiceFactory.createService();
+
+    private static int duration;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-rabbitmq-kafka-connector"};
+    }
+
+    @BeforeAll
+    public static void setUpAll() {
+        duration = Integer.parseInt(System.getProperty("rabbitmq.test.duration", "5"));
+    }
+
+    @Test
+    public void testMemory() throws ExecutionException, InterruptedException {
+        ConnectorPropertyFactory factory = CamelRabbitMQPropertyFactory
+                .basic()
+                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withUrl(service.connectionProperties().hostname(), service.connectionProperties().port(),
+                        "X.test")
+                .append("username", service.connectionProperties().username())
+                .append("password", service.connectionProperties().password())
+                .append("autoDelete", "true")
+                .append("queue", "Q.test.kafka.import")
+                .append("routingKey", "events")
+                .buildUrl();
+
+        factory.log();
+        getKafkaConnectService().initializeConnector(factory);
+
+        Thread.sleep(Duration.ofMinutes(duration).toMillis());
+    }
+
+}
diff --git a/tests/pom.xml b/tests/pom.xml
index 52f63e7..f65a488 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -58,6 +58,7 @@
         <module>itests-jdbc</module>
         <module>itests-azure-common</module>
         <module>itests-azure-storage-queue</module>
+        <module>perf-tests-rabbitmq</module>
     </modules>