You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2020/09/02 12:48:32 UTC

[camel-kafka-connector] branch master updated: Added new test for Azure Storage Queue sink

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b0b626  Added new test for Azure Storage Queue sink
0b0b626 is described below

commit 0b0b6267e9472e886f74d4a579f5535b7b95dac4
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Aug 31 14:18:43 2020 +0200

    Added new test for Azure Storage Queue sink
---
 README.adoc                                        |   5 +
 tests/itests-azure-common/pom.xml                  |  61 ++++++++
 .../azure/common/AzureCredentialsHolder.java       |  23 +++
 .../azure/common/services/AzureService.java        |  56 +++++++
 .../azure/common/services/AzureServices.java       |  26 ++++
 .../azure/common/services/AzureStorageService.java |  47 ++++++
 .../azure/common/services/AzuriteContainer.java    |  56 +++++++
 tests/itests-azure-storage-queue/pom.xml           |  76 ++++++++++
 .../azure/storage/common/AzureConfigs.java         |  29 ++++
 .../sink/CamelSinkAzureStorageQueueITCase.java     | 166 +++++++++++++++++++++
 .../CamelSinkAzureStorageQueuePropertyFactory.java |  61 ++++++++
 .../storage/queue/sink/TestQueueConfiguration.java |  39 +++++
 .../storage/services/AzureStorageClientUtils.java  |  61 ++++++++
 .../AzureStorageQueueLocalContainerService.java    |  47 ++++++
 .../services/AzureStorageQueueRemoteService.java   |  57 +++++++
 .../services/AzureStorageQueueServiceFactory.java  |  46 ++++++
 .../src/test/resources/log4j2.properties           |   8 +-
 tests/pom.xml                                      |   2 +
 18 files changed, 865 insertions(+), 1 deletion(-)

diff --git a/README.adoc b/README.adoc
index b6e614a..4da89c8 100644
--- a/README.adoc
+++ b/README.adoc
@@ -51,6 +51,11 @@ for remote testing:
 ** secret.key: AWS secret key (mandatory for remote testing)
 ** aws.region: AWS region (optional)
 ** aws.host: AWS host (optional)
+* azure.instance.type
+** azure.storage.queue.host
+** azure.storage.queue.port
+** azure.storage.queue.account.name
+** azure.storage.queue.account.key
 * elasticsearch.instance.type
 ** elasticsearch.host
 ** elasticsearch.port
diff --git a/tests/itests-azure-common/pom.xml b/tests/itests-azure-common/pom.xml
new file mode 100644
index 0000000..710d998
--- /dev/null
+++ b/tests/itests-azure-common/pom.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.camel.kafkaconnector</groupId>
+        <artifactId>itests-parent</artifactId>
+        <version>0.5.0-SNAPSHOT</version>
+        <relativePath>../itests-parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>itests-azure-common</artifactId>
+    <name>Camel-Kafka-Connector :: Tests :: Azure Common</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.kafkaconnector</groupId>
+            <artifactId>itests-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>test-compile</phase>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+
+</project>
\ No newline at end of file
diff --git a/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/AzureCredentialsHolder.java b/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/AzureCredentialsHolder.java
new file mode 100644
index 0000000..7d337b5
--- /dev/null
+++ b/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/AzureCredentialsHolder.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.common;
+
+public interface AzureCredentialsHolder {
+    String accountName();
+    String accountKey();
+}
diff --git a/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/services/AzureService.java b/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/services/AzureService.java
new file mode 100644
index 0000000..5f43f10
--- /dev/null
+++ b/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/services/AzureService.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.common.services;
+
+import org.apache.camel.kafkaconnector.azure.common.AzureCredentialsHolder;
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public interface AzureService<T> extends BeforeAllCallback, AfterAllCallback {
+
+    /**
+     * Gets the credentials for the test service
+     * @return
+     */
+    AzureCredentialsHolder azureCredentials();
+
+
+    T getClient();
+
+    /**
+     * Perform any initialization necessary
+     */
+    void initialize();
+
+    /**
+     * Shuts down the service after the test has completed
+     */
+    void shutdown();
+
+
+    @Override
+    default void beforeAll(ExtensionContext extensionContext) throws Exception {
+        initialize();
+    }
+
+    @Override
+    default void afterAll(ExtensionContext extensionContext) throws Exception {
+        shutdown();
+    }
+}
diff --git a/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/services/AzureServices.java b/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/services/AzureServices.java
new file mode 100644
index 0000000..b837215
--- /dev/null
+++ b/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/services/AzureServices.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.common.services;
+
+public final class AzureServices {
+    public static final int BLOB_SERVICE = 10000;
+    public static final int QUEUE_SERVICE = 10001;
+
+    private AzureServices() {
+    }
+}
diff --git a/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/services/AzureStorageService.java b/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/services/AzureStorageService.java
new file mode 100644
index 0000000..8005285
--- /dev/null
+++ b/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/services/AzureStorageService.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.common.services;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AzureStorageService<T> implements AzureService<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(AzureStorageService.class);
+    private final AzuriteContainer container = new AzuriteContainer();
+
+    public AzureStorageService() {
+        container.start();
+    }
+
+    protected AzuriteContainer getContainer() {
+        return container;
+    }
+
+    @Override
+    public void initialize() {
+        LOG.info("Azurite local blob service running at address {}:{}", container.getHost(),
+                container.getMappedPort(AzureServices.BLOB_SERVICE));
+        LOG.info("Azurite local queue service running at address {}:{}", container.getHost(),
+                container.getMappedPort(AzureServices.QUEUE_SERVICE));
+    }
+
+    @Override
+    public void shutdown() {
+        container.stop();
+    }
+}
diff --git a/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/services/AzuriteContainer.java b/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/services/AzuriteContainer.java
new file mode 100644
index 0000000..2bba8a7
--- /dev/null
+++ b/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/services/AzuriteContainer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.common.services;
+
+import org.apache.camel.kafkaconnector.azure.common.AzureCredentialsHolder;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+public class AzuriteContainer extends GenericContainer<AzuriteContainer> {
+    public static final String DEFAULT_ACCOUNT_NAME = "devstoreaccount1";
+    public static final String DEFAULT_ACCOUNT_KEY = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
+
+    private static final String CONTAINER_NAME = "mcr.microsoft.com/azure-storage/azurite:3.8.0";
+
+    public AzuriteContainer() {
+        this(CONTAINER_NAME);
+    }
+
+    public AzuriteContainer(String containerName) {
+        super(containerName);
+
+        withExposedPorts(AzureServices.BLOB_SERVICE, AzureServices.QUEUE_SERVICE);
+
+        waitingFor(Wait.forListeningPort());
+    }
+
+    public AzureCredentialsHolder azureCredentials() {
+        // Default credentials for Azurite
+        return new AzureCredentialsHolder() {
+            @Override
+            public String accountName() {
+                return DEFAULT_ACCOUNT_NAME;
+            }
+
+            @Override
+            public String accountKey() {
+                return DEFAULT_ACCOUNT_KEY;
+            }
+        };
+    }
+}
diff --git a/tests/itests-azure-storage-queue/pom.xml b/tests/itests-azure-storage-queue/pom.xml
new file mode 100644
index 0000000..a85b48a
--- /dev/null
+++ b/tests/itests-azure-storage-queue/pom.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.camel.kafkaconnector</groupId>
+        <artifactId>itests-parent</artifactId>
+        <version>0.5.0-SNAPSHOT</version>
+        <relativePath>../itests-parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>itests-azure-storage-queue</artifactId>
+    <name>Camel-Kafka-Connector :: Tests :: Azure Storage Queue</name>
+
+    <properties>
+        <version.netty.azure>4.1.49.Final</version.netty.azure>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.kafkaconnector</groupId>
+            <artifactId>itests-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel.kafkaconnector</groupId>
+            <artifactId>itests-azure-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <!--
+            NOTE: the only reason we have this one here is because there is a conflict between the
+            version of Netty used by the azure client and the one used within Kafka Connect.
+            This leads to strange issues such as NoSuchMethodExceptions. To work-around this, we
+            explicitly include the Netty version used by Azure here, so it forces the one from Kafka
+            Connect out.
+            -->
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-handler</artifactId>
+            <version>${version.netty.azure}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-azure-storage-queue</artifactId>
+        </dependency>
+
+    </dependencies>
+
+
+</project>
\ No newline at end of file
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/common/AzureConfigs.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/common/AzureConfigs.java
new file mode 100644
index 0000000..3a55fe2
--- /dev/null
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/common/AzureConfigs.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.storage.common;
+
+public final class AzureConfigs {
+    public static final String HOST = "azure.storage.queue.host";
+    public static final String PORT = "azure.storage.queue.port";
+    public static final String ACCOUNT_NAME = "azure.storage.queue.account.name";
+    public static final String ACCOUNT_KEY = "azure.storage.queue.account.key";
+
+    private AzureConfigs() {
+
+    }
+}
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
new file mode 100644
index 0000000..d9c0e60
--- /dev/null
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.storage.queue.sink;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import com.azure.storage.queue.QueueClient;
+import com.azure.storage.queue.QueueServiceClient;
+import com.azure.storage.queue.models.PeekedMessageItem;
+import org.apache.camel.kafkaconnector.CamelSinkTask;
+import org.apache.camel.kafkaconnector.azure.common.AzureCredentialsHolder;
+import org.apache.camel.kafkaconnector.azure.common.services.AzureService;
+import org.apache.camel.kafkaconnector.azure.storage.services.AzureStorageQueueServiceFactory;
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class CamelSinkAzureStorageQueueITCase extends AbstractKafkaTest {
+    @RegisterExtension
+    public static AzureService<QueueServiceClient> service = AzureStorageQueueServiceFactory.createAzureService();
+
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAzureStorageQueueITCase.class);
+
+    private QueueServiceClient client;
+    private QueueClient queueClient;
+    private String queueName;
+    private int expect = 10;
+    private int received;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[]{"camel-azure-storage-queue-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        client = service.getClient();
+        queueName = "test-queue" + TestUtils.randomWithRange(0, 100);
+
+        queueClient = client.createQueue(queueName);
+        received = 0;
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (client != null) {
+            client.deleteQueue(queueName);
+        }
+    }
+
+    private void acknowledgeReceived(PeekedMessageItem peekedMessageItem) {
+        received++;
+        LOG.info("Received: {}", peekedMessageItem.getMessageText());
+    }
+
+    private boolean canConsume() {
+        return queueClient.getProperties().getApproximateMessagesCount() >= expect;
+    }
+
+    private void consume() {
+        LOG.debug("Created the consumer ...");
+        TestUtils.waitFor(this::canConsume);
+
+        LOG.debug("About to receive messages");
+        int count = queueClient.getProperties().getApproximateMessagesCount();
+
+        queueClient.peekMessages(count, null, null).forEach(this::acknowledgeReceived);
+
+    }
+
+    private void putRecords() {
+        Map<String, String> messageParameters = new HashMap<>();
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+
+        for (int i = 0; i < expect; i++) {
+            try {
+                // This is for 3.4 only. From 3.5 and newer, the text is taken from the body
+                messageParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAzureStorageQueueMessageText", "test " + i);
+
+                kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "test " + i, messageParameters);
+            } catch (ExecutionException e) {
+                LOG.error("Unable to produce messages: {}", e.getMessage(), e);
+            } catch (InterruptedException e) {
+                break;
+            }
+        }
+    }
+
+
+    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException, IOException {
+        connectorPropertyFactory.log();
+        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+
+        putRecords();
+
+        consume();
+
+        assertEquals(expect, received, "Did not receive the same amount of messages that were sent");
+    }
+
+
+    @Disabled("Disabled due to issue #409")
+    @Test
+    @Timeout(90)
+    public void testBasicSendReceive() throws InterruptedException, ExecutionException, IOException {
+        AzureCredentialsHolder azureCredentialsHolder = service.azureCredentials();
+
+        ConnectorPropertyFactory connectorPropertyFactory = CamelSinkAzureStorageQueuePropertyFactory
+                .basic()
+                .withConfiguration(TestQueueConfiguration.class.getName())
+                .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withAccessKey(azureCredentialsHolder.accountKey())
+                .withAccountName(azureCredentialsHolder.accountName())
+                .withOperation("sendMessage")
+                .withQueueName(queueName);
+
+        runTest(connectorPropertyFactory);
+    }
+
+
+    @Test
+    @Timeout(90)
+    public void testBasicSendReceiveUrl() throws InterruptedException, ExecutionException, IOException {
+        AzureCredentialsHolder azureCredentialsHolder = service.azureCredentials();
+
+        ConnectorPropertyFactory connectorPropertyFactory = CamelSinkAzureStorageQueuePropertyFactory
+                .basic()
+                .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withConfiguration(TestQueueConfiguration.class.getName())
+                .withUrl(azureCredentialsHolder.accountName() + "/" + queueName)
+                .append("accessKey", azureCredentialsHolder.accountKey())
+                .append("operation", "sendMessage")
+                .buildUrl();
+
+        runTest(connectorPropertyFactory);
+    }
+}
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueuePropertyFactory.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueuePropertyFactory.java
new file mode 100644
index 0000000..c6f22f1
--- /dev/null
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueuePropertyFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.storage.queue.sink;
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+
+public class CamelSinkAzureStorageQueuePropertyFactory extends SinkConnectorPropertyFactory<CamelSinkAzureStorageQueuePropertyFactory> {
+
+    public CamelSinkAzureStorageQueuePropertyFactory withAccountName(String value) {
+        return setProperty("camel.sink.path.accountName", value);
+    }
+
+    public CamelSinkAzureStorageQueuePropertyFactory withQueueName(String value) {
+        return setProperty("camel.sink.path.queueName", value);
+    }
+
+    public CamelSinkAzureStorageQueuePropertyFactory withAccessKey(String value) {
+        return setProperty("camel.component.azure-storage-queue.accessKey", value);
+    }
+
+    public CamelSinkAzureStorageQueuePropertyFactory withConfiguration(String configurationClass) {
+        return setProperty("camel.component.azure-storage-queue.configuration", classRef(configurationClass));
+    }
+
+    public CamelSinkAzureStorageQueuePropertyFactory withOperation(String value) {
+        return setProperty("camel.component.azure-storage-queue.operation", value);
+    }
+
+    public EndpointUrlBuilder<CamelSinkAzureStorageQueuePropertyFactory> withUrl(String destinationName) {
+        String sinkUrl = String.format("azure-storage-queue://%s", destinationName);
+
+        return new EndpointUrlBuilder<>(this::withSinkUrl, sinkUrl);
+    }
+
+
+    public static CamelSinkAzureStorageQueuePropertyFactory basic() {
+        return new CamelSinkAzureStorageQueuePropertyFactory()
+                    .withTasksMax(1)
+                    .withName("CamelAzurestoragequeueSinkConnector")
+                    .withConnectorClass("org.apache.camel.kafkaconnector.azurestoragequeue.CamelAzurestoragequeueSinkConnector")
+                    .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                    .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+
+    }
+}
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/TestQueueConfiguration.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/TestQueueConfiguration.java
new file mode 100644
index 0000000..d91c43e
--- /dev/null
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/TestQueueConfiguration.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.storage.queue.sink;
+
+import com.azure.storage.queue.QueueServiceClient;
+import org.apache.camel.component.azure.storage.queue.QueueConfiguration;
+import org.apache.camel.kafkaconnector.azure.storage.services.AzureStorageClientUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestQueueConfiguration extends QueueConfiguration {
+    private static final Logger LOG = LoggerFactory.getLogger(TestQueueConfiguration.class);
+    private QueueServiceClient serviceClient;
+
+    @Override
+    public QueueServiceClient getServiceClient() {
+        LOG.info("Creating a custom QueueServiceClient");
+        if (serviceClient == null) {
+            serviceClient = AzureStorageClientUtils.getClient();
+        }
+
+        return serviceClient;
+    }
+}
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageClientUtils.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageClientUtils.java
new file mode 100644
index 0000000..5087c07
--- /dev/null
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageClientUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.storage.services;
+
+import com.azure.core.http.policy.HttpLogDetailLevel;
+import com.azure.core.http.policy.HttpLogOptions;
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.queue.QueueServiceClient;
+import com.azure.storage.queue.QueueServiceClientBuilder;
+import org.apache.camel.kafkaconnector.azure.storage.common.AzureConfigs;
+
+public final class AzureStorageClientUtils {
+
+    private AzureStorageClientUtils() {
+
+    }
+
+    public static QueueServiceClient getClient() {
+        String instanceType = System.getProperty("azure.instance.type");
+
+        String accountName = System.getProperty(AzureConfigs.ACCOUNT_NAME);
+        String accountKey = System.getProperty(AzureConfigs.ACCOUNT_KEY);
+        StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);
+
+        String host = System.getProperty(AzureConfigs.HOST);
+        String port = System.getProperty(AzureConfigs.PORT);
+
+        String endpoint;
+
+        if (instanceType == null || instanceType.equals("local-azure-container")) {
+            endpoint = String.format("http://%s:%s/%s", host, port, accountName);
+        } else {
+            if (host == null || host.isEmpty()) {
+                endpoint = String.format("https://%s.queue.core.windows.net/%s", accountName, accountKey);
+            } else {
+                endpoint = String.format("http://%s:%s/%s", host, port, accountName);
+            }
+        }
+
+        return new QueueServiceClientBuilder()
+                .endpoint(endpoint)
+                .credential(credential)
+                .httpLogOptions(new HttpLogOptions().setLogLevel(HttpLogDetailLevel.BODY_AND_HEADERS).setPrettyPrintBody(true))
+                .buildClient();
+    }
+}
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueLocalContainerService.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueLocalContainerService.java
new file mode 100644
index 0000000..6aeae34
--- /dev/null
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueLocalContainerService.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.storage.services;
+
+import com.azure.storage.queue.QueueServiceClient;
+import org.apache.camel.kafkaconnector.azure.common.AzureCredentialsHolder;
+import org.apache.camel.kafkaconnector.azure.common.services.AzureServices;
+import org.apache.camel.kafkaconnector.azure.common.services.AzureStorageService;
+import org.apache.camel.kafkaconnector.azure.storage.common.AzureConfigs;
+
+public class AzureStorageQueueLocalContainerService extends AzureStorageService<QueueServiceClient> {
+
+    @Override
+    public void initialize() {
+        super.initialize();
+
+        System.setProperty(AzureConfigs.ACCOUNT_NAME, getContainer().azureCredentials().accountName());
+        System.setProperty(AzureConfigs.ACCOUNT_KEY, getContainer().azureCredentials().accountKey());
+        System.setProperty(AzureConfigs.HOST, getContainer().getContainerIpAddress());
+        System.setProperty(AzureConfigs.PORT, String.valueOf(getContainer().getMappedPort(AzureServices.QUEUE_SERVICE)));
+    }
+
+    @Override
+    public AzureCredentialsHolder azureCredentials() {
+        return getContainer().azureCredentials();
+    }
+
+    @Override
+    public QueueServiceClient getClient() {
+        return AzureStorageClientUtils.getClient();
+    }
+}
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueRemoteService.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueRemoteService.java
new file mode 100644
index 0000000..12912fd
--- /dev/null
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueRemoteService.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.storage.services;
+
+import com.azure.storage.queue.QueueServiceClient;
+import org.apache.camel.kafkaconnector.azure.common.AzureCredentialsHolder;
+import org.apache.camel.kafkaconnector.azure.common.services.AzureService;
+import org.apache.camel.kafkaconnector.azure.storage.common.AzureConfigs;
+
+public class AzureStorageQueueRemoteService implements AzureService<QueueServiceClient> {
+
+    @Override
+    public void initialize() {
+        // NO-OP
+    }
+
+    @Override
+    public void shutdown() {
+        // NO-OP
+    }
+
+    @Override
+    public AzureCredentialsHolder azureCredentials() {
+        // Default credentials for Azurite
+        return new AzureCredentialsHolder() {
+            @Override
+            public String accountName() {
+                return System.getProperty(AzureConfigs.ACCOUNT_NAME);
+            }
+
+            @Override
+            public String accountKey() {
+                return System.getProperty(AzureConfigs.ACCOUNT_KEY);
+            }
+        };
+    }
+
+    @Override
+    public QueueServiceClient getClient() {
+        return AzureStorageClientUtils.getClient();
+    }
+}
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueServiceFactory.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueServiceFactory.java
new file mode 100644
index 0000000..86c321d
--- /dev/null
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueServiceFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.storage.services;
+
+import com.azure.storage.queue.QueueServiceClient;
+import org.apache.camel.kafkaconnector.azure.common.services.AzureService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class AzureStorageQueueServiceFactory {
+    private static final Logger LOG = LoggerFactory.getLogger(AzureStorageQueueServiceFactory.class);
+
+    private AzureStorageQueueServiceFactory() {
+
+    }
+
+    public static AzureService<QueueServiceClient> createAzureService() {
+        String instanceType = System.getProperty("azure.instance.type");
+
+        if (instanceType == null || instanceType.equals("local-azure-container")) {
+            return new AzureStorageQueueLocalContainerService();
+        }
+
+        if (instanceType.equals("remote")) {
+            return new AzureStorageQueueRemoteService();
+        }
+
+        LOG.error("Azure instance must be one of 'local-azure-container' or 'remote");
+        throw new UnsupportedOperationException(String.format("Invalid Azure instance type: %s", instanceType));
+    }
+}
diff --git a/tests/itests-common/src/test/resources/log4j2.properties b/tests/itests-common/src/test/resources/log4j2.properties
index 72b615e..aa48d0a 100644
--- a/tests/itests-common/src/test/resources/log4j2.properties
+++ b/tests/itests-common/src/test/resources/log4j2.properties
@@ -88,4 +88,10 @@ logger.aws1.appenderRef.file.ref = file
 logger.aws2.name = software.amazon.awssdk
 logger.aws2.level = INFO
 logger.aws2.additivity = false
-logger.aws2.appenderRef.file.ref = file
\ No newline at end of file
+logger.aws2.appenderRef.file.ref = file
+
+# Azure
+logger.azure.name = com.azure
+logger.azure.level = trace
+logger.azure.additivity = false
+logger.azure.appenderRef.file.ref = file
diff --git a/tests/pom.xml b/tests/pom.xml
index 23855c9..52f63e7 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -56,6 +56,8 @@
         <module>itests-hdfs</module>
         <module>itests-mongodb</module>
         <module>itests-jdbc</module>
+        <module>itests-azure-common</module>
+        <module>itests-azure-storage-queue</module>
     </modules>