You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2020/10/03 07:51:00 UTC

[camel-kafka-connector] branch master updated (07e43f0 -> 993322a)

This is an automated email from the ASF dual-hosted git repository.

valdar pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


    from 07e43f0  Updated CHANGELOG.md
     new b7510cb  Added RabbitMQ itests.
     new 993322a  Refactor per-tests-rebbitmq to depend on itests-rabbitmq.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../{itests-aws-common => itests-rabbitmq}/pom.xml |  22 ++-
 .../rabbitmq/clients/RabbitMQClient.java           | 184 +++++++++++++++++++++
 .../rabbitmq/services/ConnectionProperties.java    |   0
 .../services/RabbitMQLocalContainerService.java    |   6 +
 .../rabbitmq/services/RabbitMQRemoteService.java   |   7 +
 .../rabbitmq/services/RabbitMQService.java         |   6 +
 .../rabbitmq/services/RabbitMQServiceFactory.java  |   0
 .../sink}/CamelRabbitMQPropertyFactory.java        |  16 +-
 .../rabbitmq/sink/RabbitMQSinkITCase.java          | 145 ++++++++++++++++
 .../source/CamelRabbitMQPropertyFactory.java       |   4 +-
 .../rabbitmq/source/RabbitMQSourceITCase.java}     |  97 ++++-------
 tests/perf-tests-rabbitmq/pom.xml                  |   9 +
 tests/pom.xml                                      |   1 +
 13 files changed, 423 insertions(+), 74 deletions(-)
 copy tests/{itests-aws-common => itests-rabbitmq}/pom.xml (78%)
 create mode 100644 tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/clients/RabbitMQClient.java
 rename tests/{perf-tests-rabbitmq => itests-rabbitmq}/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/ConnectionProperties.java (100%)
 rename tests/{perf-tests-rabbitmq => itests-rabbitmq}/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQLocalContainerService.java (91%)
 rename tests/{perf-tests-rabbitmq => itests-rabbitmq}/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQRemoteService.java (87%)
 rename tests/{perf-tests-rabbitmq => itests-rabbitmq}/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQService.java (90%)
 rename tests/{perf-tests-rabbitmq => itests-rabbitmq}/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQServiceFactory.java (100%)
 copy tests/{perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source => itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink}/CamelRabbitMQPropertyFactory.java (82%)
 create mode 100644 tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java
 copy tests/{perf-tests-rabbitmq => itests-rabbitmq}/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/CamelRabbitMQPropertyFactory.java (95%)
 copy tests/{itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelSourceMongoDBITCase.java => itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java} (50%)


[camel-kafka-connector] 02/02: Refactor per-tests-rebbitmq to depend on itests-rabbitmq.

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 993322a0aa6c6ac591647250d5c39d4dfd99cd3a
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Fri Oct 2 22:46:32 2020 +0200

    Refactor per-tests-rebbitmq to depend on itests-rabbitmq.
---
 tests/itests-rabbitmq/pom.xml                      | 20 ++++---
 tests/perf-tests-rabbitmq/pom.xml                  |  9 +++
 .../rabbitmq/services/ConnectionProperties.java    | 25 --------
 .../services/RabbitMQLocalContainerService.java    | 67 ----------------------
 .../rabbitmq/services/RabbitMQRemoteService.java   | 37 ------------
 .../rabbitmq/services/RabbitMQService.java         | 54 -----------------
 .../rabbitmq/services/RabbitMQServiceFactory.java  | 45 ---------------
 7 files changed, 21 insertions(+), 236 deletions(-)

diff --git a/tests/itests-rabbitmq/pom.xml b/tests/itests-rabbitmq/pom.xml
index 4365c68..baa2a18 100644
--- a/tests/itests-rabbitmq/pom.xml
+++ b/tests/itests-rabbitmq/pom.xml
@@ -57,14 +57,18 @@
 
     <build>
         <plugins>
-<!--            <plugin>-->
-<!--                <groupId>org.apache.maven.plugins</groupId>-->
-<!--                <artifactId>maven-failsafe-plugin</artifactId>-->
-<!--                <configuration>-->
-<!--                    <argLine>${common.failsafe.args} ${jvm.user.settings} -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.local.only=false -Djava.rmi.server.hostname=${rmi.server} -Dcom.sun.management.jmxremote.rmi.port=${jmx.port}</argLine>-->
-<!--                    <skipTests>${skipIntegrationTests}</skipTests>-->
-<!--                </configuration>-->
-<!--            </plugin>-->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>test-compile</phase>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
 
diff --git a/tests/perf-tests-rabbitmq/pom.xml b/tests/perf-tests-rabbitmq/pom.xml
index 5d73fce..887abef 100644
--- a/tests/perf-tests-rabbitmq/pom.xml
+++ b/tests/perf-tests-rabbitmq/pom.xml
@@ -53,6 +53,15 @@
             <artifactId>rabbitmq</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel.kafkaconnector</groupId>
+            <artifactId>itests-rabbitmq</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git a/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/ConnectionProperties.java b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/ConnectionProperties.java
deleted file mode 100644
index 15b7f8d..0000000
--- a/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/ConnectionProperties.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.rabbitmq.services;
-
-public interface ConnectionProperties {
-    String username();
-    String password();
-    String hostname();
-    int port();
-}
diff --git a/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQLocalContainerService.java b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQLocalContainerService.java
deleted file mode 100644
index b6ae6d1..0000000
--- a/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQLocalContainerService.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.rabbitmq.services;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.RabbitMQContainer;
-
-public class RabbitMQLocalContainerService implements RabbitMQService {
-    private static final Logger LOG = LoggerFactory.getLogger(RabbitMQLocalContainerService.class);
-
-    private final RabbitMQContainer container = new RabbitMQContainer("rabbitmq:3.8-management");
-
-    public RabbitMQLocalContainerService() {
-        container.start();
-    }
-
-    @Override
-    public ConnectionProperties connectionProperties() {
-        return new ConnectionProperties() {
-            @Override
-            public String username() {
-                return container.getAdminUsername();
-            }
-
-            @Override
-            public String password() {
-                return container.getAdminPassword();
-            }
-
-            @Override
-            public String hostname() {
-                return container.getHost();
-            }
-
-            @Override
-            public int port() {
-                return container.getAmqpPort();
-            }
-        };
-    }
-
-    @Override
-    public void initialize() {
-        LOG.info("RabbitMQ container running on {}", container.getAmqpUrl());
-    }
-
-    @Override
-    public void shutdown() {
-        container.stop();
-    }
-}
diff --git a/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQRemoteService.java b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQRemoteService.java
deleted file mode 100644
index b6957be..0000000
--- a/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQRemoteService.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.rabbitmq.services;
-
-public class RabbitMQRemoteService implements RabbitMQService {
-
-
-    @Override
-    public ConnectionProperties connectionProperties() {
-        return null;
-    }
-
-    @Override
-    public void initialize() {
-
-    }
-
-    @Override
-    public void shutdown() {
-
-    }
-}
diff --git a/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQService.java b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQService.java
deleted file mode 100644
index 94e12bd..0000000
--- a/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQService.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.rabbitmq.services;
-
-import org.junit.jupiter.api.extension.AfterAllCallback;
-import org.junit.jupiter.api.extension.BeforeAllCallback;
-import org.junit.jupiter.api.extension.ExtensionContext;
-
-public interface RabbitMQService extends BeforeAllCallback, AfterAllCallback {
-
-
-    /**
-     * The connection properties for the service
-     * @return
-     */
-    ConnectionProperties connectionProperties();
-
-
-    /**
-     * Perform any initialization necessary
-     */
-    void initialize();
-
-    /**
-     * Shuts down the service after the test has completed
-     */
-    void shutdown();
-
-
-    @Override
-    default void afterAll(ExtensionContext extensionContext) throws Exception {
-        shutdown();
-    }
-
-    @Override
-    default void beforeAll(ExtensionContext extensionContext) throws Exception {
-        initialize();
-    }
-}
diff --git a/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQServiceFactory.java b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQServiceFactory.java
deleted file mode 100644
index 50013dd..0000000
--- a/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQServiceFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.rabbitmq.services;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public final class RabbitMQServiceFactory {
-    private static final Logger LOG = LoggerFactory.getLogger(RabbitMQServiceFactory.class);
-
-    private RabbitMQServiceFactory() {
-
-    }
-
-    public static RabbitMQService createService() {
-        String instanceType = System.getProperty("rabbitmq.instance.type");
-
-        if (instanceType == null || instanceType.equals("local-rabbitmq-container")) {
-            return new RabbitMQLocalContainerService();
-        }
-
-        if (instanceType.equals("remote")) {
-            return new RabbitMQRemoteService();
-        }
-
-        LOG.error("rabbit-mq instance must be one of 'local-rabbitmq-container' or 'remote");
-        throw new UnsupportedOperationException(String.format("Invalid rabbitmq instance type: %s", instanceType));
-
-    }
-}


[camel-kafka-connector] 01/02: Added RabbitMQ itests.

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit b7510cbded3a82378b7f689e3c5506fc8bfabc25
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Fri Oct 2 22:17:27 2020 +0200

    Added RabbitMQ itests.
---
 tests/itests-rabbitmq/pom.xml                      |  72 ++++++++
 .../rabbitmq/clients/RabbitMQClient.java           | 184 +++++++++++++++++++++
 .../rabbitmq/services/ConnectionProperties.java    |  25 +++
 .../services/RabbitMQLocalContainerService.java    |  73 ++++++++
 .../rabbitmq/services/RabbitMQRemoteService.java   |  44 +++++
 .../rabbitmq/services/RabbitMQService.java         |  60 +++++++
 .../rabbitmq/services/RabbitMQServiceFactory.java  |  45 +++++
 .../sink/CamelRabbitMQPropertyFactory.java         |  76 +++++++++
 .../rabbitmq/sink/RabbitMQSinkITCase.java          | 145 ++++++++++++++++
 .../source/CamelRabbitMQPropertyFactory.java       |  76 +++++++++
 .../rabbitmq/source/RabbitMQSourceITCase.java      | 109 ++++++++++++
 tests/pom.xml                                      |   1 +
 12 files changed, 910 insertions(+)

diff --git a/tests/itests-rabbitmq/pom.xml b/tests/itests-rabbitmq/pom.xml
new file mode 100644
index 0000000..4365c68
--- /dev/null
+++ b/tests/itests-rabbitmq/pom.xml
@@ -0,0 +1,72 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.camel.kafkaconnector</groupId>
+        <artifactId>itests-parent</artifactId>
+        <version>0.6.0-SNAPSHOT</version>
+        <relativePath>../itests-parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>itests-rabbitmq</artifactId>
+    <name>Camel-Kafka-Connector :: Tests :: RabbitMQ</name>
+
+    <properties>
+<!--        <jmx.port>9010</jmx.port>-->
+<!--        <rmi.server>localhost</rmi.server>-->
+<!--        <jvm.user.settings />-->
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.kafkaconnector</groupId>
+            <artifactId>itests-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-rabbitmq</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>rabbitmq</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+<!--            <plugin>-->
+<!--                <groupId>org.apache.maven.plugins</groupId>-->
+<!--                <artifactId>maven-failsafe-plugin</artifactId>-->
+<!--                <configuration>-->
+<!--                    <argLine>${common.failsafe.args} ${jvm.user.settings} -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.local.only=false -Djava.rmi.server.hostname=${rmi.server} -Dcom.sun.management.jmxremote.rmi.port=${jmx.port}</argLine>-->
+<!--                    <skipTests>${skipIntegrationTests}</skipTests>-->
+<!--                </configuration>-->
+<!--            </plugin>-->
+        </plugins>
+    </build>
+
+
+</project>
\ No newline at end of file
diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/clients/RabbitMQClient.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/clients/RabbitMQClient.java
new file mode 100644
index 0000000..19da657
--- /dev/null
+++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/clients/RabbitMQClient.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.rabbitmq.clients;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.MessageProperties;
+import org.junit.jupiter.api.Assertions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A basic RabbitMQ client
+ */
+public class RabbitMQClient {
+    private static final Logger LOG = LoggerFactory.getLogger(RabbitMQClient.class);
+    private static final String DEFAULT_EXCHANGE_TYPE = "direct";
+
+    private Connection connection;
+    private Channel channel;
+
+    private ConnectionFactory factory;
+
+    public RabbitMQClient(String uri) {
+        factory = new ConnectionFactory();
+        try {
+            factory.setUri(uri);
+        } catch (Exception e) {
+            LOG.error("Unable to create the RabbitMQ client {}", e.getMessage(), e);
+            Assertions.fail(e);
+        }
+    }
+
+    private static void capturingClose(Closeable closeable, String closableDescription) {
+        LOG.debug("Closing the " + closableDescription);
+
+        if (closeable != null) {
+            try {
+                closeable.close();
+            } catch (Throwable t) {
+                LOG.warn("Error closing the {}: {}", closableDescription, t.getMessage(), t);
+            }
+        }
+    }
+
+    private static void capturingClose(AutoCloseable closeable, String closableDescription) {
+        LOG.debug("Closing the " + closableDescription);
+
+        if (closeable != null) {
+            try {
+                closeable.close();
+            } catch (Throwable t) {
+                LOG.warn("Error closing the {}: {}", closableDescription, t.getMessage(), t);
+            }
+        }
+    }
+
+    public void start() throws Exception {
+        LOG.debug("Starting the RabbitMQ client");
+
+        try {
+            LOG.debug("Creating the connection");
+            connection = factory.newConnection();
+            LOG.debug("Connection created successfully");
+
+            LOG.debug("Creating the Channel");
+            channel = connection.createChannel();
+            LOG.debug("Channel created successfully");
+        } catch (Throwable t) {
+            LOG.trace("Something wrong happened while initializing the RabbitMQ client: {}", t.getMessage(), t);
+
+            capturingClose(connection, "connection");
+            throw t;
+        }
+    }
+
+    public void stop() {
+        try {
+            LOG.debug("Stopping the channel");
+            capturingClose(channel, "channel");
+
+            LOG.debug("Stopping the RabbitMQ connection");
+            capturingClose(connection, "connection");
+        } finally {
+            channel = null;
+            connection = null;
+        }
+    }
+
+    public AMQP.Queue.DeclareOk createQueue(final String queueName) {
+        try {
+            start();
+            return channel.queueDeclare(queueName, true, false, false, null);
+        } catch (Exception e) {
+            Assertions.fail(e.getMessage());
+
+            // unreachable
+            return null;
+        } finally {
+            stop();
+        }
+    }
+
+    public AMQP.Exchange.DeclareOk createExchange(final String exchangeName) {
+        return createExchange(exchangeName, DEFAULT_EXCHANGE_TYPE);
+    }
+
+    public AMQP.Exchange.DeclareOk createExchange(final String exchangeName, final String exchangeType) {
+        try {
+            start();
+            return channel.exchangeDeclare(exchangeName, exchangeType);
+        } catch (Exception e) {
+            Assertions.fail(e.getMessage());
+
+            // unreachable
+            return null;
+        } finally {
+            stop();
+        }
+    }
+
+    public AMQP.Queue.BindOk bindExchangeToQueue(final String exchangeName, final String queueName) {
+        try {
+            start();
+            return channel.queueBind(exchangeName, exchangeName, "");
+        } catch (Exception e) {
+            Assertions.fail(e.getMessage());
+
+            // unreachable
+            return null;
+        } finally {
+            stop();
+        }
+    }
+
+    /**
+     * Sends data to a RabbitMQ queue
+     *
+     * @param queue the queue to send data to
+     * @param data  the (string) data to send
+     * @throws IOException
+     */
+    public void send(final String queue, final String data) {
+        try {
+            start();
+            channel.basicPublish("", queue, MessageProperties.PERSISTENT_TEXT_PLAIN, data.getBytes(StandardCharsets.UTF_8));
+        } catch (Exception e) {
+            Assertions.fail(e.getMessage());
+        } finally {
+            stop();
+        }
+    }
+
+    /**
+     * Receives data from a JMS queue or topic
+     *
+     * @param queue     the queue or topic to receive data from
+     * @param deliverCallback the callback used to test each received messages
+     */
+    public void receive(final String queue, DeliverCallback deliverCallback) throws Exception {
+        channel.basicConsume(queue, true, deliverCallback, consumerTag -> { });
+    }
+}
diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/ConnectionProperties.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/ConnectionProperties.java
new file mode 100644
index 0000000..15b7f8d
--- /dev/null
+++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/ConnectionProperties.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.rabbitmq.services;
+
+public interface ConnectionProperties {
+    String username();
+    String password();
+    String hostname();
+    int port();
+}
diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQLocalContainerService.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQLocalContainerService.java
new file mode 100644
index 0000000..ee7a827
--- /dev/null
+++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQLocalContainerService.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.rabbitmq.services;
+
+import org.apache.camel.kafkaconnector.rabbitmq.clients.RabbitMQClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.RabbitMQContainer;
+
+public class RabbitMQLocalContainerService implements RabbitMQService {
+    private static final Logger LOG = LoggerFactory.getLogger(RabbitMQLocalContainerService.class);
+
+    private final RabbitMQContainer container = new RabbitMQContainer("rabbitmq:3.8-management");
+
+    public RabbitMQLocalContainerService() {
+        container.start();
+    }
+
+    @Override
+    public ConnectionProperties connectionProperties() {
+        return new ConnectionProperties() {
+            @Override
+            public String username() {
+                return container.getAdminUsername();
+            }
+
+            @Override
+            public String password() {
+                return container.getAdminPassword();
+            }
+
+            @Override
+            public String hostname() {
+                return container.getHost();
+            }
+
+            @Override
+            public int port() {
+                return container.getAmqpPort();
+            }
+        };
+    }
+
+    @Override
+    public RabbitMQClient getClient() {
+        return new RabbitMQClient(container.getAmqpUrl());
+    }
+
+    @Override
+    public void initialize() {
+        LOG.info("RabbitMQ container running on {}", container.getAmqpUrl());
+    }
+
+    @Override
+    public void shutdown() {
+        container.stop();
+    }
+}
diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQRemoteService.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQRemoteService.java
new file mode 100644
index 0000000..74d2a48
--- /dev/null
+++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQRemoteService.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.rabbitmq.services;
+
+import org.apache.camel.kafkaconnector.rabbitmq.clients.RabbitMQClient;
+
+public class RabbitMQRemoteService implements RabbitMQService {
+
+
+    @Override
+    public ConnectionProperties connectionProperties() {
+        return null;
+    }
+
+    @Override
+    public RabbitMQClient getClient() {
+        return null;
+    }
+
+    @Override
+    public void initialize() {
+
+    }
+
+    @Override
+    public void shutdown() {
+
+    }
+}
diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQService.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQService.java
new file mode 100644
index 0000000..d026ba2
--- /dev/null
+++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQService.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.rabbitmq.services;
+
+import org.apache.camel.kafkaconnector.rabbitmq.clients.RabbitMQClient;
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public interface RabbitMQService extends BeforeAllCallback, AfterAllCallback {
+
+
+    /**
+     * The connection properties for the service
+     * @return
+     */
+    ConnectionProperties connectionProperties();
+
+    /**
+     * Get the appropriate client for the service
+     * @return
+     */
+    RabbitMQClient getClient();
+
+    /**
+     * Perform any initialization necessary
+     */
+    void initialize();
+
+    /**
+     * Shuts down the service after the test has completed
+     */
+    void shutdown();
+
+
+    @Override
+    default void afterAll(ExtensionContext extensionContext) throws Exception {
+        shutdown();
+    }
+
+    @Override
+    default void beforeAll(ExtensionContext extensionContext) throws Exception {
+        initialize();
+    }
+}
diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQServiceFactory.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQServiceFactory.java
new file mode 100644
index 0000000..50013dd
--- /dev/null
+++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQServiceFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.rabbitmq.services;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class RabbitMQServiceFactory {
+    private static final Logger LOG = LoggerFactory.getLogger(RabbitMQServiceFactory.class);
+
+    private RabbitMQServiceFactory() {
+
+    }
+
+    public static RabbitMQService createService() {
+        String instanceType = System.getProperty("rabbitmq.instance.type");
+
+        if (instanceType == null || instanceType.equals("local-rabbitmq-container")) {
+            return new RabbitMQLocalContainerService();
+        }
+
+        if (instanceType.equals("remote")) {
+            return new RabbitMQRemoteService();
+        }
+
+        LOG.error("rabbit-mq instance must be one of 'local-rabbitmq-container' or 'remote");
+        throw new UnsupportedOperationException(String.format("Invalid rabbitmq instance type: %s", instanceType));
+
+    }
+}
diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/CamelRabbitMQPropertyFactory.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/CamelRabbitMQPropertyFactory.java
new file mode 100644
index 0000000..0c8e467
--- /dev/null
+++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/CamelRabbitMQPropertyFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.rabbitmq.sink;
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+
+public class CamelRabbitMQPropertyFactory extends SinkConnectorPropertyFactory<CamelRabbitMQPropertyFactory> {
+    public CamelRabbitMQPropertyFactory withHostname(String value) {
+        return setProperty("camel.component.rabbitmq.hostname", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withPortNumber(int value) {
+        return setProperty("camel.component.rabbitmq.portNumber", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withUsername(String value) {
+        return setProperty("camel.component.rabbitmq.username", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withPassword(String value) {
+        return setProperty("camel.component.rabbitmq.password", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withExchangeName(String value) {
+        return setProperty("camel.source.path.exchangeName", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withExchangeType(String value) {
+        return setProperty("camel.source.endpoint.exchangeType", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withAutoDelete(boolean value) {
+        return setProperty("camel.source.endpoint.autoDelete", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withQueue(String value) {
+        return setProperty("camel.source.endpoint.queue", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withRoutingKey(String value) {
+        return setProperty("camel.source.endpoint.routingKey", value);
+    }
+
+    public EndpointUrlBuilder<CamelRabbitMQPropertyFactory> withUrl(String exchangeName) {
+        String sourceUrl = String.format("rabbitmq://%s", exchangeName);
+
+        return new EndpointUrlBuilder<>(this::withSinkUrl, sourceUrl);
+    }
+
+    public static CamelRabbitMQPropertyFactory basic() {
+        return new CamelRabbitMQPropertyFactory()
+                    .withTasksMax(1)
+                    .withName("CamelRabbitmqSinkConnector")
+                    .withConnectorClass("org.apache.camel.kafkaconnector.rabbitmq.CamelRabbitmqSinkConnector")
+                    .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                    .withValueConverterClass("org.apache.kafka.connect.converters.ByteArrayConverter");
+    
+    }
+
+}
diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java
new file mode 100644
index 0000000..80b1606
--- /dev/null
+++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.rabbitmq.sink;
+
+import java.io.UnsupportedEncodingException;
+import java.util.concurrent.CountDownLatch;
+
+import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.Delivery;
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.rabbitmq.clients.RabbitMQClient;
+import org.apache.camel.kafkaconnector.rabbitmq.services.RabbitMQService;
+import org.apache.camel.kafkaconnector.rabbitmq.services.RabbitMQServiceFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@Testcontainers
+public class RabbitMQSinkITCase extends AbstractKafkaTest {
+    @RegisterExtension
+    public static RabbitMQService rabbitmqService = RabbitMQServiceFactory.createService();
+
+    private static final Logger LOG = LoggerFactory.getLogger(RabbitMQSinkITCase.class);
+    private static final String DEFAULT_RABBITMQ_QUEUE = "Q.test.kafka.import";
+
+    private RabbitMQClient rabbitMQClient;
+    private int received;
+    private final int expect = 10;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-rabbitmq-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        received = 0;
+        rabbitMQClient = rabbitmqService.getClient();
+    }
+
+    private boolean checkRecord(Delivery rabbitMQDelivery) {
+        try {
+            String message = new String(rabbitMQDelivery.getBody(), "UTF-8");
+            LOG.debug("Received: {}", message);
+
+            received++;
+
+            if (received == expect) {
+                return false;
+            }
+
+            return true;
+        } catch (UnsupportedEncodingException e) {
+            LOG.error("Failed to read message: {}", e.getMessage(), e);
+            fail("Failed to read message: " + e.getMessage());
+            return false;
+        }
+    }
+
+    private void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws Exception {
+        connectorPropertyFactory.log();
+        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        LOG.debug("Creating the consumer ...");
+        rabbitMQClient.createQueue(DEFAULT_RABBITMQ_QUEUE);
+        try {
+            rabbitMQClient.start();
+            consumeRabbitMQMessages(latch);
+
+            KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+
+            for (int i = 0; i < expect; i++) {
+                kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test message " + i);
+            }
+
+            LOG.debug("Created the consumer ... About to receive messages");
+
+            latch.await();
+            assertEquals(received, expect, "Didn't process the expected amount of messages: " + received + " != " + expect);
+        } finally {
+            rabbitMQClient.stop();
+        }
+    }
+
+    @Test
+    @Timeout(90)
+    public void testSource() throws Exception {
+        ConnectorPropertyFactory factory = CamelRabbitMQPropertyFactory
+                .basic()
+                .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withUrl("")
+                .append("username", rabbitmqService.connectionProperties().username())
+                .append("password", rabbitmqService.connectionProperties().password())
+                .append("autoDelete", "false")
+                .append("queue", DEFAULT_RABBITMQ_QUEUE)
+                .append("RoutingKey", DEFAULT_RABBITMQ_QUEUE)
+                .append("skipExchangeDeclare", "true")
+                .append("skipQueueBind", "true")
+                .append("hostname", rabbitmqService.connectionProperties().hostname())
+                .append("portNumber", rabbitmqService.connectionProperties().port())
+                .buildUrl();
+
+        runBasicStringTest(factory);
+    }
+
+    private void consumeRabbitMQMessages(CountDownLatch latch) {
+        DeliverCallback deliveryCallback = (consumerTag, delivery) -> {
+            if (!this.checkRecord(delivery)) {
+                latch.countDown();
+            }
+        };
+        try {
+            rabbitMQClient.receive(DEFAULT_RABBITMQ_QUEUE, deliveryCallback);
+        } catch (Exception e) {
+            LOG.error("RabbitMQ test failed: {}", e.getMessage(), e);
+            fail(e.getMessage());
+        }
+    }
+}
diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/CamelRabbitMQPropertyFactory.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/CamelRabbitMQPropertyFactory.java
new file mode 100644
index 0000000..1548b2e
--- /dev/null
+++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/CamelRabbitMQPropertyFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.rabbitmq.source;
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
+
+public class CamelRabbitMQPropertyFactory extends SourceConnectorPropertyFactory<CamelRabbitMQPropertyFactory> {
+    public CamelRabbitMQPropertyFactory withHostname(String value) {
+        return setProperty("camel.component.rabbitmq.hostname", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withPortNumber(int value) {
+        return setProperty("camel.component.rabbitmq.portNumber", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withUsername(String value) {
+        return setProperty("camel.component.rabbitmq.username", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withPassword(String value) {
+        return setProperty("camel.component.rabbitmq.password", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withExchangeName(String value) {
+        return setProperty("camel.source.path.exchangeName", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withExchangeType(String value) {
+        return setProperty("camel.source.endpoint.exchangeType", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withAutoDelete(boolean value) {
+        return setProperty("camel.source.endpoint.autoDelete", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withQueue(String value) {
+        return setProperty("camel.source.endpoint.queue", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withRoutingKey(String value) {
+        return setProperty("camel.source.endpoint.routingKey", value);
+    }
+
+    public EndpointUrlBuilder<CamelRabbitMQPropertyFactory> withUrl(String exchangeName) {
+        String sourceUrl = String.format("rabbitmq://%s", exchangeName);
+
+        return new EndpointUrlBuilder<>(this::withSourceUrl, sourceUrl);
+    }
+
+    public static CamelRabbitMQPropertyFactory basic() {
+        return new CamelRabbitMQPropertyFactory()
+                    .withTasksMax(1)
+                    .withName("CamelRabbitmqSourceConnector")
+                    .withConnectorClass("org.apache.camel.kafkaconnector.rabbitmq.CamelRabbitmqSourceConnector")
+                    .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                    .withValueConverterClass("org.apache.kafka.connect.converters.ByteArrayConverter");
+    
+    }
+
+}
diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java
new file mode 100644
index 0000000..73a75e3
--- /dev/null
+++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.rabbitmq.source;
+
+import java.util.concurrent.ExecutionException;
+
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.rabbitmq.clients.RabbitMQClient;
+import org.apache.camel.kafkaconnector.rabbitmq.services.RabbitMQService;
+import org.apache.camel.kafkaconnector.rabbitmq.services.RabbitMQServiceFactory;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Testcontainers
+public class RabbitMQSourceITCase extends AbstractKafkaTest {
+    @RegisterExtension
+    public static RabbitMQService rabbitmqService = RabbitMQServiceFactory.createService();
+
+    private static final Logger LOG = LoggerFactory.getLogger(RabbitMQSourceITCase.class);
+    private static final String DEFAULT_RABBITMQ_QUEUE = "Q.test.kafka.import";
+
+    private RabbitMQClient rabbitMQClient;
+    private int received;
+    private final int expect = 10;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-rabbitmq-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        received = 0;
+        rabbitMQClient = rabbitmqService.getClient();
+    }
+
+    private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
+        LOG.debug("Received: {}", record.value());
+        received++;
+
+        if (received == expect) {
+            return false;
+        }
+
+        return true;
+    }
+
+    public void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
+        connectorPropertyFactory.log();
+        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+        rabbitMQClient.createQueue(DEFAULT_RABBITMQ_QUEUE);
+
+        for (int i = 0; i < expect; i++) {
+            rabbitMQClient.send(DEFAULT_RABBITMQ_QUEUE, "Test string message");
+        }
+
+        LOG.debug("Creating the kafka consumer ...");
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
+        LOG.debug("Created the kafka consumer ...");
+
+        assertEquals(received, expect, "Didn't process the expected amount of messages");
+    }
+
+    @Test
+    @Timeout(90)
+    public void testSource() throws ExecutionException, InterruptedException {
+        ConnectorPropertyFactory factory = CamelRabbitMQPropertyFactory
+                .basic()
+                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withUrl("")
+                .append("username", rabbitmqService.connectionProperties().username())
+                .append("password", rabbitmqService.connectionProperties().password())
+                .append("autoDelete", "false")
+                .append("queue", DEFAULT_RABBITMQ_QUEUE)
+                .append("skipExchangeDeclare", "true")
+                .append("skipQueueBind", "true")
+                .append("hostname", rabbitmqService.connectionProperties().hostname())
+                .append("portNumber", rabbitmqService.connectionProperties().port())
+                .buildUrl();
+
+        runBasicStringTest(factory);
+    }
+}
diff --git a/tests/pom.xml b/tests/pom.xml
index d52caed..c95de51 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -59,6 +59,7 @@
         <module>itests-azure-common</module>
         <module>itests-azure-storage-queue</module>
         <module>perf-tests-rabbitmq</module>
+        <module>itests-rabbitmq</module>
     </modules>