You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2020/10/13 10:49:19 UTC

[camel-kafka-connector] branch master updated: Added new test for Azure Storage Blob sink

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 24db35f  Added new test for Azure Storage Blob sink
24db35f is described below

commit 24db35f62bb72561024f1015714a5745a777ee7a
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Oct 12 09:48:04 2020 +0200

    Added new test for Azure Storage Blob sink
---
 .../kafkaconnector/azure}/common/AzureConfigs.java |  10 +-
 tests/itests-azure-storage-blob/pom.xml            |  76 ++++++++++
 .../blob/sink/CamelSinkAzureStorageBlobITCase.java | 159 +++++++++++++++++++++
 .../CamelSinkAzureStorageBlobPropertyFactory.java  |  65 +++++++++
 .../storage/blob/sink/TestBlobConfiguration.java}  |  24 +++-
 .../services/AzureStorageBlobClientUtils.java}     |  14 +-
 .../AzureStorageBlobLocalContainerService.java}    |  12 +-
 .../services/AzureStorageBlobRemoteService.java}   |  10 +-
 .../services/AzureStorageBlobServiceFactory.java   |  46 ++++++
 .../storage/services/AzureStorageClientUtils.java  |   2 +-
 .../AzureStorageQueueLocalContainerService.java    |   2 +-
 .../services/AzureStorageQueueRemoteService.java   |   2 +-
 tests/pom.xml                                      |   1 +
 13 files changed, 390 insertions(+), 33 deletions(-)

diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/common/AzureConfigs.java b/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/AzureConfigs.java
similarity index 71%
copy from tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/common/AzureConfigs.java
copy to tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/AzureConfigs.java
index 3a55fe2..b90bf80 100644
--- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/common/AzureConfigs.java
+++ b/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/AzureConfigs.java
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.camel.kafkaconnector.azure.storage.common;
+package org.apache.camel.kafkaconnector.azure.common;
 
 public final class AzureConfigs {
-    public static final String HOST = "azure.storage.queue.host";
-    public static final String PORT = "azure.storage.queue.port";
-    public static final String ACCOUNT_NAME = "azure.storage.queue.account.name";
-    public static final String ACCOUNT_KEY = "azure.storage.queue.account.key";
+    public static final String HOST = "azure.storage.host";
+    public static final String PORT = "azure.storage.port";
+    public static final String ACCOUNT_NAME = "azure.storage.account.name";
+    public static final String ACCOUNT_KEY = "azure.storage.account.key";
 
     private AzureConfigs() {
 
diff --git a/tests/itests-azure-storage-blob/pom.xml b/tests/itests-azure-storage-blob/pom.xml
new file mode 100644
index 0000000..a16bfe8
--- /dev/null
+++ b/tests/itests-azure-storage-blob/pom.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.camel.kafkaconnector</groupId>
+        <artifactId>itests-parent</artifactId>
+        <version>0.6.0-SNAPSHOT</version>
+        <relativePath>../itests-parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <name>Camel-Kafka-Connector :: Tests :: Azure Storage Blob</name>
+    <artifactId>itests-azure-storage-blob</artifactId>
+
+    <properties>
+        <version.netty.azure>4.1.49.Final</version.netty.azure>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.kafkaconnector</groupId>
+            <artifactId>itests-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel.kafkaconnector</groupId>
+            <artifactId>itests-azure-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <!--
+            NOTE: the only reason we have this one here is because there is a conflict between the
+            version of Netty used by the azure client and the one used within Kafka Connect.
+            This leads to strange issues such as NoSuchMethodExceptions. To work-around this, we
+            explicitly include the Netty version used by Azure here, so it forces the one from Kafka
+            Connect out.
+            -->
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-handler</artifactId>
+            <version>${version.netty.azure}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-azure-storage-blob</artifactId>
+        </dependency>
+
+    </dependencies>
+
+
+</project>
\ No newline at end of file
diff --git a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
new file mode 100644
index 0000000..33e3c42
--- /dev/null
+++ b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.storage.blob.sink;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.models.BlobItem;
+import org.apache.camel.kafkaconnector.CamelSinkTask;
+import org.apache.camel.kafkaconnector.azure.common.AzureCredentialsHolder;
+import org.apache.camel.kafkaconnector.azure.common.services.AzureService;
+import org.apache.camel.kafkaconnector.azure.storage.services.AzureStorageBlobServiceFactory;
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class CamelSinkAzureStorageBlobITCase extends AbstractKafkaTest {
+    @RegisterExtension
+    public static AzureService<BlobServiceClient> service = AzureStorageBlobServiceFactory.createAzureService();
+
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAzureStorageBlobITCase.class);
+
+    private BlobServiceClient client;
+    private BlobContainerClient blobContainerClient;
+    private String blobContainerName;
+    private Map<String, String> sentData = new HashMap<>();
+
+    private int expect = 10;
+    private int received;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[]{"camel-azure-storage-blob-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUpBlob() {
+        client = service.getClient();
+
+        blobContainerName = "test-" +  TestUtils.randomWithRange(1, 100);
+        blobContainerClient = client.createBlobContainer(blobContainerName);
+        received = 0;
+    }
+
+    private boolean canConsume() {
+        return blobContainerClient.exists() && blobContainerClient.listBlobs().stream().count() > 0;
+    }
+
+    private void consume() {
+        LOG.debug("Created the consumer ...");
+        TestUtils.waitFor(this::canConsume);
+
+        for (BlobItem blobContainerItem : blobContainerClient.listBlobs()) {
+            String receivedFile = blobContainerItem.getName();
+            assertTrue(sentData.containsKey(receivedFile));
+
+            // TODO: check the file contents in the future
+            received++;
+        }
+    }
+
+    private void putRecords() {
+        Map<String, String> messageParameters = new HashMap<>();
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+
+        for (int i = 0; i < expect; i++) {
+            try {
+                String sentFile = "test " + i;
+                String sentText = "test " + i + " data";
+
+                sentData.put(sentFile, sentText);
+
+                messageParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAzureStorageBlobBlobName", sentFile);
+
+                kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), sentText, messageParameters);
+            } catch (ExecutionException e) {
+                LOG.error("Unable to produce messages: {}", e.getMessage(), e);
+            } catch (InterruptedException e) {
+                break;
+            }
+        }
+    }
+
+    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException, IOException {
+        connectorPropertyFactory.log();
+        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+
+        putRecords();
+
+        consume();
+
+        assertEquals(expect, received, "Did not receive the same amount of messages that were sent");
+    }
+
+    @Disabled("Disabled due to issue #409")
+    @Test
+    @Timeout(90)
+    public void testBasicSendReceive() throws InterruptedException, ExecutionException, IOException {
+        AzureCredentialsHolder azureCredentialsHolder = service.azureCredentials();
+
+        ConnectorPropertyFactory factory = CamelSinkAzureStorageBlobPropertyFactory
+                .basic()
+                .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withConfiguration(TestBlobConfiguration.class.getName())
+                .withAccessKey(azureCredentialsHolder.accountKey())
+                .withAccountName(azureCredentialsHolder.accountName())
+                .withContainerName(blobContainerName)
+                .withOperation("uploadBlockBlob");
+
+        runTest(factory);
+    }
+
+    @Test
+    @Timeout(90)
+    public void testBasicSendReceiveUrl() throws InterruptedException, ExecutionException, IOException {
+        AzureCredentialsHolder azureCredentialsHolder = service.azureCredentials();
+
+        ConnectorPropertyFactory factory = CamelSinkAzureStorageBlobPropertyFactory
+                .basic()
+                .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withConfiguration(TestBlobConfiguration.class.getName())
+                .withUrl(azureCredentialsHolder.accountName() + "/" + blobContainerName)
+                    .append("accessKey", azureCredentialsHolder.accountKey())
+                    .append("operation", "uploadBlockBlob")
+                    .buildUrl();
+
+        runTest(factory);
+    }
+}
diff --git a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobPropertyFactory.java b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobPropertyFactory.java
new file mode 100644
index 0000000..48491a8
--- /dev/null
+++ b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobPropertyFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.storage.blob.sink;
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+
+public class CamelSinkAzureStorageBlobPropertyFactory extends SinkConnectorPropertyFactory<CamelSinkAzureStorageBlobPropertyFactory> {
+
+    public CamelSinkAzureStorageBlobPropertyFactory withAccountName(String value) {
+        return setProperty("camel.sink.path.accountName", value);
+    }
+
+    public CamelSinkAzureStorageBlobPropertyFactory withContainerName(String value) {
+        return setProperty("camel.sink.path.containerName", value);
+    }
+
+    public CamelSinkAzureStorageBlobPropertyFactory withAccessKey(String value) {
+        return setProperty("camel.sink.endpoint.accessKey", value);
+    }
+
+    public CamelSinkAzureStorageBlobPropertyFactory withBlobName(String value) {
+        return setProperty("camel.component.azure-storage-blob.blobName", value);
+    }
+
+    public CamelSinkAzureStorageBlobPropertyFactory withConfiguration(String configurationClass) {
+        return setProperty("camel.component.azure-storage-blob.configuration", classRef(configurationClass));
+    }
+
+    public CamelSinkAzureStorageBlobPropertyFactory withOperation(String value) {
+        return setProperty("camel.sink.endpoint.operation", value);
+    }
+
+    public EndpointUrlBuilder<CamelSinkAzureStorageBlobPropertyFactory> withUrl(String path) {
+        String sinkUrl = String.format("azure-storage-blob://%s", path);
+    
+        return new EndpointUrlBuilder<>(this::withSinkUrl, sinkUrl);
+    }
+
+
+    public static CamelSinkAzureStorageBlobPropertyFactory basic() {
+        return new CamelSinkAzureStorageBlobPropertyFactory()
+                    .withTasksMax(1)
+                    .withName("CamelAzurestorageblobSinkConnector")
+                    .withConnectorClass("org.apache.camel.kafkaconnector.azurestorageblob.CamelAzurestorageblobSinkConnector")
+                    .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                    .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+
+    }
+}
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/common/AzureConfigs.java b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/TestBlobConfiguration.java
similarity index 50%
rename from tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/common/AzureConfigs.java
rename to tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/TestBlobConfiguration.java
index 3a55fe2..93e1cb8 100644
--- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/common/AzureConfigs.java
+++ b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/TestBlobConfiguration.java
@@ -15,15 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.camel.kafkaconnector.azure.storage.common;
+package org.apache.camel.kafkaconnector.azure.storage.blob.sink;
 
-public final class AzureConfigs {
-    public static final String HOST = "azure.storage.queue.host";
-    public static final String PORT = "azure.storage.queue.port";
-    public static final String ACCOUNT_NAME = "azure.storage.queue.account.name";
-    public static final String ACCOUNT_KEY = "azure.storage.queue.account.key";
+import com.azure.storage.blob.BlobServiceClient;
+import org.apache.camel.component.azure.storage.blob.BlobConfiguration;
+import org.apache.camel.kafkaconnector.azure.storage.services.AzureStorageBlobClientUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-    private AzureConfigs() {
+public class TestBlobConfiguration extends BlobConfiguration {
+    private static final Logger LOG = LoggerFactory.getLogger(TestBlobConfiguration.class);
+    private BlobServiceClient serviceClient;
 
+    @Override
+    public BlobServiceClient getServiceClient() {
+        if (serviceClient == null) {
+            LOG.info("Creating a custom BlobServiceClient");
+            serviceClient = AzureStorageBlobClientUtils.getClient();
+        }
+
+        return serviceClient;
     }
 }
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageClientUtils.java b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobClientUtils.java
similarity index 85%
copy from tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageClientUtils.java
copy to tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobClientUtils.java
index 5087c07..61f42fa 100644
--- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageClientUtils.java
+++ b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobClientUtils.java
@@ -19,18 +19,18 @@ package org.apache.camel.kafkaconnector.azure.storage.services;
 
 import com.azure.core.http.policy.HttpLogDetailLevel;
 import com.azure.core.http.policy.HttpLogOptions;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
 import com.azure.storage.common.StorageSharedKeyCredential;
-import com.azure.storage.queue.QueueServiceClient;
-import com.azure.storage.queue.QueueServiceClientBuilder;
-import org.apache.camel.kafkaconnector.azure.storage.common.AzureConfigs;
+import org.apache.camel.kafkaconnector.azure.common.AzureConfigs;
 
-public final class AzureStorageClientUtils {
+public final class AzureStorageBlobClientUtils {
 
-    private AzureStorageClientUtils() {
+    private AzureStorageBlobClientUtils() {
 
     }
 
-    public static QueueServiceClient getClient() {
+    public static BlobServiceClient getClient() {
         String instanceType = System.getProperty("azure.instance.type");
 
         String accountName = System.getProperty(AzureConfigs.ACCOUNT_NAME);
@@ -52,7 +52,7 @@ public final class AzureStorageClientUtils {
             }
         }
 
-        return new QueueServiceClientBuilder()
+        return new BlobServiceClientBuilder()
                 .endpoint(endpoint)
                 .credential(credential)
                 .httpLogOptions(new HttpLogOptions().setLogLevel(HttpLogDetailLevel.BODY_AND_HEADERS).setPrettyPrintBody(true))
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueLocalContainerService.java b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobLocalContainerService.java
similarity index 81%
copy from tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueLocalContainerService.java
copy to tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobLocalContainerService.java
index 6aeae34..902846d 100644
--- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueLocalContainerService.java
+++ b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobLocalContainerService.java
@@ -17,13 +17,13 @@
 
 package org.apache.camel.kafkaconnector.azure.storage.services;
 
-import com.azure.storage.queue.QueueServiceClient;
+import com.azure.storage.blob.BlobServiceClient;
+import org.apache.camel.kafkaconnector.azure.common.AzureConfigs;
 import org.apache.camel.kafkaconnector.azure.common.AzureCredentialsHolder;
 import org.apache.camel.kafkaconnector.azure.common.services.AzureServices;
 import org.apache.camel.kafkaconnector.azure.common.services.AzureStorageService;
-import org.apache.camel.kafkaconnector.azure.storage.common.AzureConfigs;
 
-public class AzureStorageQueueLocalContainerService extends AzureStorageService<QueueServiceClient> {
+public class AzureStorageBlobLocalContainerService extends AzureStorageService<BlobServiceClient> {
 
     @Override
     public void initialize() {
@@ -32,7 +32,7 @@ public class AzureStorageQueueLocalContainerService extends AzureStorageService<
         System.setProperty(AzureConfigs.ACCOUNT_NAME, getContainer().azureCredentials().accountName());
         System.setProperty(AzureConfigs.ACCOUNT_KEY, getContainer().azureCredentials().accountKey());
         System.setProperty(AzureConfigs.HOST, getContainer().getContainerIpAddress());
-        System.setProperty(AzureConfigs.PORT, String.valueOf(getContainer().getMappedPort(AzureServices.QUEUE_SERVICE)));
+        System.setProperty(AzureConfigs.PORT, String.valueOf(getContainer().getMappedPort(AzureServices.BLOB_SERVICE)));
     }
 
     @Override
@@ -41,7 +41,7 @@ public class AzureStorageQueueLocalContainerService extends AzureStorageService<
     }
 
     @Override
-    public QueueServiceClient getClient() {
-        return AzureStorageClientUtils.getClient();
+    public BlobServiceClient getClient() {
+        return AzureStorageBlobClientUtils.getClient();
     }
 }
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueRemoteService.java b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobRemoteService.java
similarity index 84%
copy from tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueRemoteService.java
copy to tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobRemoteService.java
index 12912fd..35a3616 100644
--- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueRemoteService.java
+++ b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobRemoteService.java
@@ -17,12 +17,12 @@
 
 package org.apache.camel.kafkaconnector.azure.storage.services;
 
-import com.azure.storage.queue.QueueServiceClient;
+import com.azure.storage.blob.BlobServiceClient;
+import org.apache.camel.kafkaconnector.azure.common.AzureConfigs;
 import org.apache.camel.kafkaconnector.azure.common.AzureCredentialsHolder;
 import org.apache.camel.kafkaconnector.azure.common.services.AzureService;
-import org.apache.camel.kafkaconnector.azure.storage.common.AzureConfigs;
 
-public class AzureStorageQueueRemoteService implements AzureService<QueueServiceClient> {
+public class AzureStorageBlobRemoteService implements AzureService<BlobServiceClient> {
 
     @Override
     public void initialize() {
@@ -51,7 +51,7 @@ public class AzureStorageQueueRemoteService implements AzureService<QueueService
     }
 
     @Override
-    public QueueServiceClient getClient() {
-        return AzureStorageClientUtils.getClient();
+    public BlobServiceClient getClient() {
+        return AzureStorageBlobClientUtils.getClient();
     }
 }
diff --git a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobServiceFactory.java b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobServiceFactory.java
new file mode 100644
index 0000000..cd7ad0d
--- /dev/null
+++ b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobServiceFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.storage.services;
+
+import com.azure.storage.blob.BlobServiceClient;
+import org.apache.camel.kafkaconnector.azure.common.services.AzureService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class AzureStorageBlobServiceFactory {
+    private static final Logger LOG = LoggerFactory.getLogger(AzureStorageBlobServiceFactory.class);
+
+    private AzureStorageBlobServiceFactory() {
+
+    }
+
+    public static AzureService<BlobServiceClient> createAzureService() {
+        String instanceType = System.getProperty("azure.instance.type");
+
+        if (instanceType == null || instanceType.equals("local-azure-container")) {
+            return new AzureStorageBlobLocalContainerService();
+        }
+
+        if (instanceType.equals("remote")) {
+            return new AzureStorageBlobRemoteService();
+        }
+
+        LOG.error("Azure instance must be one of 'local-azure-container' or 'remote");
+        throw new UnsupportedOperationException(String.format("Invalid Azure instance type: %s", instanceType));
+    }
+}
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageClientUtils.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageClientUtils.java
index 5087c07..451776f 100644
--- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageClientUtils.java
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageClientUtils.java
@@ -22,7 +22,7 @@ import com.azure.core.http.policy.HttpLogOptions;
 import com.azure.storage.common.StorageSharedKeyCredential;
 import com.azure.storage.queue.QueueServiceClient;
 import com.azure.storage.queue.QueueServiceClientBuilder;
-import org.apache.camel.kafkaconnector.azure.storage.common.AzureConfigs;
+import org.apache.camel.kafkaconnector.azure.common.AzureConfigs;
 
 public final class AzureStorageClientUtils {
 
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueLocalContainerService.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueLocalContainerService.java
index 6aeae34..60c4f3f 100644
--- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueLocalContainerService.java
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueLocalContainerService.java
@@ -18,10 +18,10 @@
 package org.apache.camel.kafkaconnector.azure.storage.services;
 
 import com.azure.storage.queue.QueueServiceClient;
+import org.apache.camel.kafkaconnector.azure.common.AzureConfigs;
 import org.apache.camel.kafkaconnector.azure.common.AzureCredentialsHolder;
 import org.apache.camel.kafkaconnector.azure.common.services.AzureServices;
 import org.apache.camel.kafkaconnector.azure.common.services.AzureStorageService;
-import org.apache.camel.kafkaconnector.azure.storage.common.AzureConfigs;
 
 public class AzureStorageQueueLocalContainerService extends AzureStorageService<QueueServiceClient> {
 
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueRemoteService.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueRemoteService.java
index 12912fd..8f1afb5 100644
--- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueRemoteService.java
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueRemoteService.java
@@ -18,9 +18,9 @@
 package org.apache.camel.kafkaconnector.azure.storage.services;
 
 import com.azure.storage.queue.QueueServiceClient;
+import org.apache.camel.kafkaconnector.azure.common.AzureConfigs;
 import org.apache.camel.kafkaconnector.azure.common.AzureCredentialsHolder;
 import org.apache.camel.kafkaconnector.azure.common.services.AzureService;
-import org.apache.camel.kafkaconnector.azure.storage.common.AzureConfigs;
 
 public class AzureStorageQueueRemoteService implements AzureService<QueueServiceClient> {
 
diff --git a/tests/pom.xml b/tests/pom.xml
index c95de51..dea3592 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -57,6 +57,7 @@
         <module>itests-mongodb</module>
         <module>itests-jdbc</module>
         <module>itests-azure-common</module>
+        <module>itests-azure-storage-blob</module>
         <module>itests-azure-storage-queue</module>
         <module>perf-tests-rabbitmq</module>
         <module>itests-rabbitmq</module>