You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2020/10/13 10:49:19 UTC
[camel-kafka-connector] branch master updated: Added new test for
Azure Storage Blob sink
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 24db35f Added new test for Azure Storage Blob sink
24db35f is described below
commit 24db35f62bb72561024f1015714a5745a777ee7a
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Oct 12 09:48:04 2020 +0200
Added new test for Azure Storage Blob sink
---
.../kafkaconnector/azure}/common/AzureConfigs.java | 10 +-
tests/itests-azure-storage-blob/pom.xml | 76 ++++++++++
.../blob/sink/CamelSinkAzureStorageBlobITCase.java | 159 +++++++++++++++++++++
.../CamelSinkAzureStorageBlobPropertyFactory.java | 65 +++++++++
.../storage/blob/sink/TestBlobConfiguration.java} | 24 +++-
.../services/AzureStorageBlobClientUtils.java} | 14 +-
.../AzureStorageBlobLocalContainerService.java} | 12 +-
.../services/AzureStorageBlobRemoteService.java} | 10 +-
.../services/AzureStorageBlobServiceFactory.java | 46 ++++++
.../storage/services/AzureStorageClientUtils.java | 2 +-
.../AzureStorageQueueLocalContainerService.java | 2 +-
.../services/AzureStorageQueueRemoteService.java | 2 +-
tests/pom.xml | 1 +
13 files changed, 390 insertions(+), 33 deletions(-)
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/common/AzureConfigs.java b/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/AzureConfigs.java
similarity index 71%
copy from tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/common/AzureConfigs.java
copy to tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/AzureConfigs.java
index 3a55fe2..b90bf80 100644
--- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/common/AzureConfigs.java
+++ b/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/AzureConfigs.java
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.camel.kafkaconnector.azure.storage.common;
+package org.apache.camel.kafkaconnector.azure.common;
public final class AzureConfigs {
- public static final String HOST = "azure.storage.queue.host";
- public static final String PORT = "azure.storage.queue.port";
- public static final String ACCOUNT_NAME = "azure.storage.queue.account.name";
- public static final String ACCOUNT_KEY = "azure.storage.queue.account.key";
+ public static final String HOST = "azure.storage.host";
+ public static final String PORT = "azure.storage.port";
+ public static final String ACCOUNT_NAME = "azure.storage.account.name";
+ public static final String ACCOUNT_KEY = "azure.storage.account.key";
private AzureConfigs() {
diff --git a/tests/itests-azure-storage-blob/pom.xml b/tests/itests-azure-storage-blob/pom.xml
new file mode 100644
index 0000000..a16bfe8
--- /dev/null
+++ b/tests/itests-azure-storage-blob/pom.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.camel.kafkaconnector</groupId>
+ <artifactId>itests-parent</artifactId>
+ <version>0.6.0-SNAPSHOT</version>
+ <relativePath>../itests-parent/pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <name>Camel-Kafka-Connector :: Tests :: Azure Storage Blob</name>
+ <artifactId>itests-azure-storage-blob</artifactId>
+
+ <properties>
+ <version.netty.azure>4.1.49.Final</version.netty.azure>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel.kafkaconnector</groupId>
+ <artifactId>itests-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.camel.kafkaconnector</groupId>
+ <artifactId>itests-azure-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <!--
+ NOTE: the only reason we have this one here is because there is a conflict between the
+ version of Netty used by the azure client and the one used within Kafka Connect.
+ This leads to strange issues such as NoSuchMethodExceptions. To work-around this, we
+ explicitly include the Netty version used by Azure here, so it forces the one from Kafka
+ Connect out.
+ -->
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ <version>${version.netty.azure}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-azure-storage-blob</artifactId>
+ </dependency>
+
+ </dependencies>
+
+
+</project>
\ No newline at end of file
diff --git a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
new file mode 100644
index 0000000..33e3c42
--- /dev/null
+++ b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.storage.blob.sink;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.models.BlobItem;
+import org.apache.camel.kafkaconnector.CamelSinkTask;
+import org.apache.camel.kafkaconnector.azure.common.AzureCredentialsHolder;
+import org.apache.camel.kafkaconnector.azure.common.services.AzureService;
+import org.apache.camel.kafkaconnector.azure.storage.services.AzureStorageBlobServiceFactory;
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class CamelSinkAzureStorageBlobITCase extends AbstractKafkaTest {
+ @RegisterExtension
+ public static AzureService<BlobServiceClient> service = AzureStorageBlobServiceFactory.createAzureService();
+
+ private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAzureStorageBlobITCase.class);
+
+ private BlobServiceClient client;
+ private BlobContainerClient blobContainerClient;
+ private String blobContainerName;
+ private Map<String, String> sentData = new HashMap<>();
+
+ private int expect = 10;
+ private int received;
+
+ @Override
+ protected String[] getConnectorsInTest() {
+ return new String[]{"camel-azure-storage-blob-kafka-connector"};
+ }
+
+ @BeforeEach
+ public void setUpBlob() {
+ client = service.getClient();
+
+ blobContainerName = "test-" + TestUtils.randomWithRange(1, 100);
+ blobContainerClient = client.createBlobContainer(blobContainerName);
+ received = 0;
+ }
+
+ private boolean canConsume() {
+ return blobContainerClient.exists() && blobContainerClient.listBlobs().stream().count() > 0;
+ }
+
+ private void consume() {
+ LOG.debug("Created the consumer ...");
+ TestUtils.waitFor(this::canConsume);
+
+ for (BlobItem blobContainerItem : blobContainerClient.listBlobs()) {
+ String receivedFile = blobContainerItem.getName();
+ assertTrue(sentData.containsKey(receivedFile));
+
+ // TODO: check the file contents in the future
+ received++;
+ }
+ }
+
+ private void putRecords() {
+ Map<String, String> messageParameters = new HashMap<>();
+ KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+
+ for (int i = 0; i < expect; i++) {
+ try {
+ String sentFile = "test " + i;
+ String sentText = "test " + i + " data";
+
+ sentData.put(sentFile, sentText);
+
+ messageParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAzureStorageBlobBlobName", sentFile);
+
+ kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), sentText, messageParameters);
+ } catch (ExecutionException e) {
+ LOG.error("Unable to produce messages: {}", e.getMessage(), e);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ }
+
+ public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException, IOException {
+ connectorPropertyFactory.log();
+ getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+
+ putRecords();
+
+ consume();
+
+ assertEquals(expect, received, "Did not receive the same amount of messages that were sent");
+ }
+
+ @Disabled("Disabled due to issue #409")
+ @Test
+ @Timeout(90)
+ public void testBasicSendReceive() throws InterruptedException, ExecutionException, IOException {
+ AzureCredentialsHolder azureCredentialsHolder = service.azureCredentials();
+
+ ConnectorPropertyFactory factory = CamelSinkAzureStorageBlobPropertyFactory
+ .basic()
+ .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+ .withConfiguration(TestBlobConfiguration.class.getName())
+ .withAccessKey(azureCredentialsHolder.accountKey())
+ .withAccountName(azureCredentialsHolder.accountName())
+ .withContainerName(blobContainerName)
+ .withOperation("uploadBlockBlob");
+
+ runTest(factory);
+ }
+
+ @Test
+ @Timeout(90)
+ public void testBasicSendReceiveUrl() throws InterruptedException, ExecutionException, IOException {
+ AzureCredentialsHolder azureCredentialsHolder = service.azureCredentials();
+
+ ConnectorPropertyFactory factory = CamelSinkAzureStorageBlobPropertyFactory
+ .basic()
+ .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+ .withConfiguration(TestBlobConfiguration.class.getName())
+ .withUrl(azureCredentialsHolder.accountName() + "/" + blobContainerName)
+ .append("accessKey", azureCredentialsHolder.accountKey())
+ .append("operation", "uploadBlockBlob")
+ .buildUrl();
+
+ runTest(factory);
+ }
+}
diff --git a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobPropertyFactory.java b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobPropertyFactory.java
new file mode 100644
index 0000000..48491a8
--- /dev/null
+++ b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobPropertyFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.storage.blob.sink;
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+
+public class CamelSinkAzureStorageBlobPropertyFactory extends SinkConnectorPropertyFactory<CamelSinkAzureStorageBlobPropertyFactory> {
+
+ public CamelSinkAzureStorageBlobPropertyFactory withAccountName(String value) {
+ return setProperty("camel.sink.path.accountName", value);
+ }
+
+ public CamelSinkAzureStorageBlobPropertyFactory withContainerName(String value) {
+ return setProperty("camel.sink.path.containerName", value);
+ }
+
+ public CamelSinkAzureStorageBlobPropertyFactory withAccessKey(String value) {
+ return setProperty("camel.sink.endpoint.accessKey", value);
+ }
+
+ public CamelSinkAzureStorageBlobPropertyFactory withBlobName(String value) {
+ return setProperty("camel.component.azure-storage-blob.blobName", value);
+ }
+
+ public CamelSinkAzureStorageBlobPropertyFactory withConfiguration(String configurationClass) {
+ return setProperty("camel.component.azure-storage-blob.configuration", classRef(configurationClass));
+ }
+
+ public CamelSinkAzureStorageBlobPropertyFactory withOperation(String value) {
+ return setProperty("camel.sink.endpoint.operation", value);
+ }
+
+ public EndpointUrlBuilder<CamelSinkAzureStorageBlobPropertyFactory> withUrl(String path) {
+ String sinkUrl = String.format("azure-storage-blob://%s", path);
+
+ return new EndpointUrlBuilder<>(this::withSinkUrl, sinkUrl);
+ }
+
+
+ public static CamelSinkAzureStorageBlobPropertyFactory basic() {
+ return new CamelSinkAzureStorageBlobPropertyFactory()
+ .withTasksMax(1)
+ .withName("CamelAzurestorageblobSinkConnector")
+ .withConnectorClass("org.apache.camel.kafkaconnector.azurestorageblob.CamelAzurestorageblobSinkConnector")
+ .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+ .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+
+ }
+}
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/common/AzureConfigs.java b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/TestBlobConfiguration.java
similarity index 50%
rename from tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/common/AzureConfigs.java
rename to tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/TestBlobConfiguration.java
index 3a55fe2..93e1cb8 100644
--- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/common/AzureConfigs.java
+++ b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/TestBlobConfiguration.java
@@ -15,15 +15,25 @@
* limitations under the License.
*/
-package org.apache.camel.kafkaconnector.azure.storage.common;
+package org.apache.camel.kafkaconnector.azure.storage.blob.sink;
-public final class AzureConfigs {
- public static final String HOST = "azure.storage.queue.host";
- public static final String PORT = "azure.storage.queue.port";
- public static final String ACCOUNT_NAME = "azure.storage.queue.account.name";
- public static final String ACCOUNT_KEY = "azure.storage.queue.account.key";
+import com.azure.storage.blob.BlobServiceClient;
+import org.apache.camel.component.azure.storage.blob.BlobConfiguration;
+import org.apache.camel.kafkaconnector.azure.storage.services.AzureStorageBlobClientUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
- private AzureConfigs() {
+public class TestBlobConfiguration extends BlobConfiguration {
+ private static final Logger LOG = LoggerFactory.getLogger(TestBlobConfiguration.class);
+ private BlobServiceClient serviceClient;
+ @Override
+ public BlobServiceClient getServiceClient() {
+ if (serviceClient == null) {
+ LOG.info("Creating a custom BlobServiceClient");
+ serviceClient = AzureStorageBlobClientUtils.getClient();
+ }
+
+ return serviceClient;
}
}
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageClientUtils.java b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobClientUtils.java
similarity index 85%
copy from tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageClientUtils.java
copy to tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobClientUtils.java
index 5087c07..61f42fa 100644
--- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageClientUtils.java
+++ b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobClientUtils.java
@@ -19,18 +19,18 @@ package org.apache.camel.kafkaconnector.azure.storage.services;
import com.azure.core.http.policy.HttpLogDetailLevel;
import com.azure.core.http.policy.HttpLogOptions;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.common.StorageSharedKeyCredential;
-import com.azure.storage.queue.QueueServiceClient;
-import com.azure.storage.queue.QueueServiceClientBuilder;
-import org.apache.camel.kafkaconnector.azure.storage.common.AzureConfigs;
+import org.apache.camel.kafkaconnector.azure.common.AzureConfigs;
-public final class AzureStorageClientUtils {
+public final class AzureStorageBlobClientUtils {
- private AzureStorageClientUtils() {
+ private AzureStorageBlobClientUtils() {
}
- public static QueueServiceClient getClient() {
+ public static BlobServiceClient getClient() {
String instanceType = System.getProperty("azure.instance.type");
String accountName = System.getProperty(AzureConfigs.ACCOUNT_NAME);
@@ -52,7 +52,7 @@ public final class AzureStorageClientUtils {
}
}
- return new QueueServiceClientBuilder()
+ return new BlobServiceClientBuilder()
.endpoint(endpoint)
.credential(credential)
.httpLogOptions(new HttpLogOptions().setLogLevel(HttpLogDetailLevel.BODY_AND_HEADERS).setPrettyPrintBody(true))
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueLocalContainerService.java b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobLocalContainerService.java
similarity index 81%
copy from tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueLocalContainerService.java
copy to tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobLocalContainerService.java
index 6aeae34..902846d 100644
--- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueLocalContainerService.java
+++ b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobLocalContainerService.java
@@ -17,13 +17,13 @@
package org.apache.camel.kafkaconnector.azure.storage.services;
-import com.azure.storage.queue.QueueServiceClient;
+import com.azure.storage.blob.BlobServiceClient;
+import org.apache.camel.kafkaconnector.azure.common.AzureConfigs;
import org.apache.camel.kafkaconnector.azure.common.AzureCredentialsHolder;
import org.apache.camel.kafkaconnector.azure.common.services.AzureServices;
import org.apache.camel.kafkaconnector.azure.common.services.AzureStorageService;
-import org.apache.camel.kafkaconnector.azure.storage.common.AzureConfigs;
-public class AzureStorageQueueLocalContainerService extends AzureStorageService<QueueServiceClient> {
+public class AzureStorageBlobLocalContainerService extends AzureStorageService<BlobServiceClient> {
@Override
public void initialize() {
@@ -32,7 +32,7 @@ public class AzureStorageQueueLocalContainerService extends AzureStorageService<
System.setProperty(AzureConfigs.ACCOUNT_NAME, getContainer().azureCredentials().accountName());
System.setProperty(AzureConfigs.ACCOUNT_KEY, getContainer().azureCredentials().accountKey());
System.setProperty(AzureConfigs.HOST, getContainer().getContainerIpAddress());
- System.setProperty(AzureConfigs.PORT, String.valueOf(getContainer().getMappedPort(AzureServices.QUEUE_SERVICE)));
+ System.setProperty(AzureConfigs.PORT, String.valueOf(getContainer().getMappedPort(AzureServices.BLOB_SERVICE)));
}
@Override
@@ -41,7 +41,7 @@ public class AzureStorageQueueLocalContainerService extends AzureStorageService<
}
@Override
- public QueueServiceClient getClient() {
- return AzureStorageClientUtils.getClient();
+ public BlobServiceClient getClient() {
+ return AzureStorageBlobClientUtils.getClient();
}
}
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueRemoteService.java b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobRemoteService.java
similarity index 84%
copy from tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueRemoteService.java
copy to tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobRemoteService.java
index 12912fd..35a3616 100644
--- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueRemoteService.java
+++ b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobRemoteService.java
@@ -17,12 +17,12 @@
package org.apache.camel.kafkaconnector.azure.storage.services;
-import com.azure.storage.queue.QueueServiceClient;
+import com.azure.storage.blob.BlobServiceClient;
+import org.apache.camel.kafkaconnector.azure.common.AzureConfigs;
import org.apache.camel.kafkaconnector.azure.common.AzureCredentialsHolder;
import org.apache.camel.kafkaconnector.azure.common.services.AzureService;
-import org.apache.camel.kafkaconnector.azure.storage.common.AzureConfigs;
-public class AzureStorageQueueRemoteService implements AzureService<QueueServiceClient> {
+public class AzureStorageBlobRemoteService implements AzureService<BlobServiceClient> {
@Override
public void initialize() {
@@ -51,7 +51,7 @@ public class AzureStorageQueueRemoteService implements AzureService<QueueService
}
@Override
- public QueueServiceClient getClient() {
- return AzureStorageClientUtils.getClient();
+ public BlobServiceClient getClient() {
+ return AzureStorageBlobClientUtils.getClient();
}
}
diff --git a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobServiceFactory.java b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobServiceFactory.java
new file mode 100644
index 0000000..cd7ad0d
--- /dev/null
+++ b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobServiceFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.storage.services;
+
+import com.azure.storage.blob.BlobServiceClient;
+import org.apache.camel.kafkaconnector.azure.common.services.AzureService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class AzureStorageBlobServiceFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(AzureStorageBlobServiceFactory.class);
+
+ private AzureStorageBlobServiceFactory() {
+
+ }
+
+ public static AzureService<BlobServiceClient> createAzureService() {
+ String instanceType = System.getProperty("azure.instance.type");
+
+ if (instanceType == null || instanceType.equals("local-azure-container")) {
+ return new AzureStorageBlobLocalContainerService();
+ }
+
+ if (instanceType.equals("remote")) {
+ return new AzureStorageBlobRemoteService();
+ }
+
+ LOG.error("Azure instance must be one of 'local-azure-container' or 'remote");
+ throw new UnsupportedOperationException(String.format("Invalid Azure instance type: %s", instanceType));
+ }
+}
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageClientUtils.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageClientUtils.java
index 5087c07..451776f 100644
--- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageClientUtils.java
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageClientUtils.java
@@ -22,7 +22,7 @@ import com.azure.core.http.policy.HttpLogOptions;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.queue.QueueServiceClient;
import com.azure.storage.queue.QueueServiceClientBuilder;
-import org.apache.camel.kafkaconnector.azure.storage.common.AzureConfigs;
+import org.apache.camel.kafkaconnector.azure.common.AzureConfigs;
public final class AzureStorageClientUtils {
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueLocalContainerService.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueLocalContainerService.java
index 6aeae34..60c4f3f 100644
--- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueLocalContainerService.java
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueLocalContainerService.java
@@ -18,10 +18,10 @@
package org.apache.camel.kafkaconnector.azure.storage.services;
import com.azure.storage.queue.QueueServiceClient;
+import org.apache.camel.kafkaconnector.azure.common.AzureConfigs;
import org.apache.camel.kafkaconnector.azure.common.AzureCredentialsHolder;
import org.apache.camel.kafkaconnector.azure.common.services.AzureServices;
import org.apache.camel.kafkaconnector.azure.common.services.AzureStorageService;
-import org.apache.camel.kafkaconnector.azure.storage.common.AzureConfigs;
public class AzureStorageQueueLocalContainerService extends AzureStorageService<QueueServiceClient> {
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueRemoteService.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueRemoteService.java
index 12912fd..8f1afb5 100644
--- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueRemoteService.java
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueRemoteService.java
@@ -18,9 +18,9 @@
package org.apache.camel.kafkaconnector.azure.storage.services;
import com.azure.storage.queue.QueueServiceClient;
+import org.apache.camel.kafkaconnector.azure.common.AzureConfigs;
import org.apache.camel.kafkaconnector.azure.common.AzureCredentialsHolder;
import org.apache.camel.kafkaconnector.azure.common.services.AzureService;
-import org.apache.camel.kafkaconnector.azure.storage.common.AzureConfigs;
public class AzureStorageQueueRemoteService implements AzureService<QueueServiceClient> {
diff --git a/tests/pom.xml b/tests/pom.xml
index c95de51..dea3592 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -57,6 +57,7 @@
<module>itests-mongodb</module>
<module>itests-jdbc</module>
<module>itests-azure-common</module>
+ <module>itests-azure-storage-blob</module>
<module>itests-azure-storage-queue</module>
<module>perf-tests-rabbitmq</module>
<module>itests-rabbitmq</module>