You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2020/09/02 12:48:32 UTC
[camel-kafka-connector] branch master updated: Added new test for
Azure Storage Queue sink
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 0b0b626 Added new test for Azure Storage Queue sink
0b0b626 is described below
commit 0b0b6267e9472e886f74d4a579f5535b7b95dac4
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Aug 31 14:18:43 2020 +0200
Added new test for Azure Storage Queue sink
---
README.adoc | 5 +
tests/itests-azure-common/pom.xml | 61 ++++++++
.../azure/common/AzureCredentialsHolder.java | 23 +++
.../azure/common/services/AzureService.java | 56 +++++++
.../azure/common/services/AzureServices.java | 26 ++++
.../azure/common/services/AzureStorageService.java | 47 ++++++
.../azure/common/services/AzuriteContainer.java | 56 +++++++
tests/itests-azure-storage-queue/pom.xml | 76 ++++++++++
.../azure/storage/common/AzureConfigs.java | 29 ++++
.../sink/CamelSinkAzureStorageQueueITCase.java | 166 +++++++++++++++++++++
.../CamelSinkAzureStorageQueuePropertyFactory.java | 61 ++++++++
.../storage/queue/sink/TestQueueConfiguration.java | 39 +++++
.../storage/services/AzureStorageClientUtils.java | 61 ++++++++
.../AzureStorageQueueLocalContainerService.java | 47 ++++++
.../services/AzureStorageQueueRemoteService.java | 57 +++++++
.../services/AzureStorageQueueServiceFactory.java | 46 ++++++
.../src/test/resources/log4j2.properties | 8 +-
tests/pom.xml | 2 +
18 files changed, 865 insertions(+), 1 deletion(-)
diff --git a/README.adoc b/README.adoc
index b6e614a..4da89c8 100644
--- a/README.adoc
+++ b/README.adoc
@@ -51,6 +51,11 @@ for remote testing:
** secret.key: AWS secret key (mandatory for remote testing)
** aws.region: AWS region (optional)
** aws.host: AWS host (optional)
+* azure.instance.type
+** azure.storage.queue.host
+** azure.storage.queue.port
+** azure.storage.queue.account.name
+** azure.storage.queue.account.key
* elasticsearch.instance.type
** elasticsearch.host
** elasticsearch.port
diff --git a/tests/itests-azure-common/pom.xml b/tests/itests-azure-common/pom.xml
new file mode 100644
index 0000000..710d998
--- /dev/null
+++ b/tests/itests-azure-common/pom.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.camel.kafkaconnector</groupId>
+ <artifactId>itests-parent</artifactId>
+ <version>0.5.0-SNAPSHOT</version>
+ <relativePath>../itests-parent/pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>itests-azure-common</artifactId>
+ <name>Camel-Kafka-Connector :: Tests :: Azure Common</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel.kafkaconnector</groupId>
+ <artifactId>itests-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>test-compile</phase>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+
+</project>
\ No newline at end of file
diff --git a/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/AzureCredentialsHolder.java b/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/AzureCredentialsHolder.java
new file mode 100644
index 0000000..7d337b5
--- /dev/null
+++ b/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/AzureCredentialsHolder.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.common;
+
+public interface AzureCredentialsHolder {
+ String accountName();
+ String accountKey();
+}
diff --git a/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/services/AzureService.java b/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/services/AzureService.java
new file mode 100644
index 0000000..5f43f10
--- /dev/null
+++ b/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/services/AzureService.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.common.services;
+
+import org.apache.camel.kafkaconnector.azure.common.AzureCredentialsHolder;
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public interface AzureService<T> extends BeforeAllCallback, AfterAllCallback {
+
+ /**
+ * Gets the credentials for the test service
+ * @return
+ */
+ AzureCredentialsHolder azureCredentials();
+
+
+ T getClient();
+
+ /**
+ * Perform any initialization necessary
+ */
+ void initialize();
+
+ /**
+ * Shuts down the service after the test has completed
+ */
+ void shutdown();
+
+
+ @Override
+ default void beforeAll(ExtensionContext extensionContext) throws Exception {
+ initialize();
+ }
+
+ @Override
+ default void afterAll(ExtensionContext extensionContext) throws Exception {
+ shutdown();
+ }
+}
diff --git a/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/services/AzureServices.java b/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/services/AzureServices.java
new file mode 100644
index 0000000..b837215
--- /dev/null
+++ b/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/services/AzureServices.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.common.services;
+
+public final class AzureServices {
+ public static final int BLOB_SERVICE = 10000;
+ public static final int QUEUE_SERVICE = 10001;
+
+ private AzureServices() {
+ }
+}
diff --git a/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/services/AzureStorageService.java b/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/services/AzureStorageService.java
new file mode 100644
index 0000000..8005285
--- /dev/null
+++ b/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/services/AzureStorageService.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.common.services;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AzureStorageService<T> implements AzureService<T> {
+ private static final Logger LOG = LoggerFactory.getLogger(AzureStorageService.class);
+ private final AzuriteContainer container = new AzuriteContainer();
+
+ public AzureStorageService() {
+ container.start();
+ }
+
+ protected AzuriteContainer getContainer() {
+ return container;
+ }
+
+ @Override
+ public void initialize() {
+ LOG.info("Azurite local blob service running at address {}:{}", container.getHost(),
+ container.getMappedPort(AzureServices.BLOB_SERVICE));
+ LOG.info("Azurite local queue service running at address {}:{}", container.getHost(),
+ container.getMappedPort(AzureServices.QUEUE_SERVICE));
+ }
+
+ @Override
+ public void shutdown() {
+ container.stop();
+ }
+}
diff --git a/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/services/AzuriteContainer.java b/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/services/AzuriteContainer.java
new file mode 100644
index 0000000..2bba8a7
--- /dev/null
+++ b/tests/itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/services/AzuriteContainer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.common.services;
+
+import org.apache.camel.kafkaconnector.azure.common.AzureCredentialsHolder;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+public class AzuriteContainer extends GenericContainer<AzuriteContainer> {
+ public static final String DEFAULT_ACCOUNT_NAME = "devstoreaccount1";
+ public static final String DEFAULT_ACCOUNT_KEY = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
+
+ private static final String CONTAINER_NAME = "mcr.microsoft.com/azure-storage/azurite:3.8.0";
+
+ public AzuriteContainer() {
+ this(CONTAINER_NAME);
+ }
+
+ public AzuriteContainer(String containerName) {
+ super(containerName);
+
+ withExposedPorts(AzureServices.BLOB_SERVICE, AzureServices.QUEUE_SERVICE);
+
+ waitingFor(Wait.forListeningPort());
+ }
+
+ public AzureCredentialsHolder azureCredentials() {
+ // Default credentials for Azurite
+ return new AzureCredentialsHolder() {
+ @Override
+ public String accountName() {
+ return DEFAULT_ACCOUNT_NAME;
+ }
+
+ @Override
+ public String accountKey() {
+ return DEFAULT_ACCOUNT_KEY;
+ }
+ };
+ }
+}
diff --git a/tests/itests-azure-storage-queue/pom.xml b/tests/itests-azure-storage-queue/pom.xml
new file mode 100644
index 0000000..a85b48a
--- /dev/null
+++ b/tests/itests-azure-storage-queue/pom.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.camel.kafkaconnector</groupId>
+ <artifactId>itests-parent</artifactId>
+ <version>0.5.0-SNAPSHOT</version>
+ <relativePath>../itests-parent/pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>itests-azure-storage-queue</artifactId>
+ <name>Camel-Kafka-Connector :: Tests :: Azure Storage Queue</name>
+
+ <properties>
+ <version.netty.azure>4.1.49.Final</version.netty.azure>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel.kafkaconnector</groupId>
+ <artifactId>itests-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.camel.kafkaconnector</groupId>
+ <artifactId>itests-azure-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <!--
+ NOTE: the only reason we have this one here is because there is a conflict between the
+ version of Netty used by the azure client and the one used within Kafka Connect.
+ This leads to strange issues such as NoSuchMethodExceptions. To work-around this, we
+ explicitly include the Netty version used by Azure here, so it forces the one from Kafka
+ Connect out.
+ -->
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ <version>${version.netty.azure}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-azure-storage-queue</artifactId>
+ </dependency>
+
+ </dependencies>
+
+
+</project>
\ No newline at end of file
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/common/AzureConfigs.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/common/AzureConfigs.java
new file mode 100644
index 0000000..3a55fe2
--- /dev/null
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/common/AzureConfigs.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.storage.common;
+
+public final class AzureConfigs {
+ public static final String HOST = "azure.storage.queue.host";
+ public static final String PORT = "azure.storage.queue.port";
+ public static final String ACCOUNT_NAME = "azure.storage.queue.account.name";
+ public static final String ACCOUNT_KEY = "azure.storage.queue.account.key";
+
+ private AzureConfigs() {
+
+ }
+}
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
new file mode 100644
index 0000000..d9c0e60
--- /dev/null
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.storage.queue.sink;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import com.azure.storage.queue.QueueClient;
+import com.azure.storage.queue.QueueServiceClient;
+import com.azure.storage.queue.models.PeekedMessageItem;
+import org.apache.camel.kafkaconnector.CamelSinkTask;
+import org.apache.camel.kafkaconnector.azure.common.AzureCredentialsHolder;
+import org.apache.camel.kafkaconnector.azure.common.services.AzureService;
+import org.apache.camel.kafkaconnector.azure.storage.services.AzureStorageQueueServiceFactory;
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class CamelSinkAzureStorageQueueITCase extends AbstractKafkaTest {
+ @RegisterExtension
+ public static AzureService<QueueServiceClient> service = AzureStorageQueueServiceFactory.createAzureService();
+
+ private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAzureStorageQueueITCase.class);
+
+ private QueueServiceClient client;
+ private QueueClient queueClient;
+ private String queueName;
+ private int expect = 10;
+ private int received;
+
+ @Override
+ protected String[] getConnectorsInTest() {
+ return new String[]{"camel-azure-storage-queue-kafka-connector"};
+ }
+
+ @BeforeEach
+ public void setUp() {
+ client = service.getClient();
+ queueName = "test-queue" + TestUtils.randomWithRange(0, 100);
+
+ queueClient = client.createQueue(queueName);
+ received = 0;
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (client != null) {
+ client.deleteQueue(queueName);
+ }
+ }
+
+ private void acknowledgeReceived(PeekedMessageItem peekedMessageItem) {
+ received++;
+ LOG.info("Received: {}", peekedMessageItem.getMessageText());
+ }
+
+ private boolean canConsume() {
+ return queueClient.getProperties().getApproximateMessagesCount() >= expect;
+ }
+
+ private void consume() {
+ LOG.debug("Created the consumer ...");
+ TestUtils.waitFor(this::canConsume);
+
+ LOG.debug("About to receive messages");
+ int count = queueClient.getProperties().getApproximateMessagesCount();
+
+ queueClient.peekMessages(count, null, null).forEach(this::acknowledgeReceived);
+
+ }
+
+ private void putRecords() {
+ Map<String, String> messageParameters = new HashMap<>();
+ KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+
+ for (int i = 0; i < expect; i++) {
+ try {
+ // This is for 3.4 only. From 3.5 and newer, the text is taken from the body
+ messageParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAzureStorageQueueMessageText", "test " + i);
+
+ kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "test " + i, messageParameters);
+ } catch (ExecutionException e) {
+ LOG.error("Unable to produce messages: {}", e.getMessage(), e);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ }
+
+
+ public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException, IOException {
+ connectorPropertyFactory.log();
+ getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+
+ putRecords();
+
+ consume();
+
+ assertEquals(expect, received, "Did not receive the same amount of messages that were sent");
+ }
+
+
+ @Disabled("Disabled due to issue #409")
+ @Test
+ @Timeout(90)
+ public void testBasicSendReceive() throws InterruptedException, ExecutionException, IOException {
+ AzureCredentialsHolder azureCredentialsHolder = service.azureCredentials();
+
+ ConnectorPropertyFactory connectorPropertyFactory = CamelSinkAzureStorageQueuePropertyFactory
+ .basic()
+ .withConfiguration(TestQueueConfiguration.class.getName())
+ .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+ .withAccessKey(azureCredentialsHolder.accountKey())
+ .withAccountName(azureCredentialsHolder.accountName())
+ .withOperation("sendMessage")
+ .withQueueName(queueName);
+
+ runTest(connectorPropertyFactory);
+ }
+
+
+ @Test
+ @Timeout(90)
+ public void testBasicSendReceiveUrl() throws InterruptedException, ExecutionException, IOException {
+ AzureCredentialsHolder azureCredentialsHolder = service.azureCredentials();
+
+ ConnectorPropertyFactory connectorPropertyFactory = CamelSinkAzureStorageQueuePropertyFactory
+ .basic()
+ .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+ .withConfiguration(TestQueueConfiguration.class.getName())
+ .withUrl(azureCredentialsHolder.accountName() + "/" + queueName)
+ .append("accessKey", azureCredentialsHolder.accountKey())
+ .append("operation", "sendMessage")
+ .buildUrl();
+
+ runTest(connectorPropertyFactory);
+ }
+}
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueuePropertyFactory.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueuePropertyFactory.java
new file mode 100644
index 0000000..c6f22f1
--- /dev/null
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueuePropertyFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.storage.queue.sink;
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+
+public class CamelSinkAzureStorageQueuePropertyFactory extends SinkConnectorPropertyFactory<CamelSinkAzureStorageQueuePropertyFactory> {
+
+ public CamelSinkAzureStorageQueuePropertyFactory withAccountName(String value) {
+ return setProperty("camel.sink.path.accountName", value);
+ }
+
+ public CamelSinkAzureStorageQueuePropertyFactory withQueueName(String value) {
+ return setProperty("camel.sink.path.queueName", value);
+ }
+
+ public CamelSinkAzureStorageQueuePropertyFactory withAccessKey(String value) {
+ return setProperty("camel.component.azure-storage-queue.accessKey", value);
+ }
+
+ public CamelSinkAzureStorageQueuePropertyFactory withConfiguration(String configurationClass) {
+ return setProperty("camel.component.azure-storage-queue.configuration", classRef(configurationClass));
+ }
+
+ public CamelSinkAzureStorageQueuePropertyFactory withOperation(String value) {
+ return setProperty("camel.component.azure-storage-queue.operation", value);
+ }
+
+ public EndpointUrlBuilder<CamelSinkAzureStorageQueuePropertyFactory> withUrl(String destinationName) {
+ String sinkUrl = String.format("azure-storage-queue://%s", destinationName);
+
+ return new EndpointUrlBuilder<>(this::withSinkUrl, sinkUrl);
+ }
+
+
+ public static CamelSinkAzureStorageQueuePropertyFactory basic() {
+ return new CamelSinkAzureStorageQueuePropertyFactory()
+ .withTasksMax(1)
+ .withName("CamelAzurestoragequeueSinkConnector")
+ .withConnectorClass("org.apache.camel.kafkaconnector.azurestoragequeue.CamelAzurestoragequeueSinkConnector")
+ .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+ .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+
+ }
+}
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/TestQueueConfiguration.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/TestQueueConfiguration.java
new file mode 100644
index 0000000..d91c43e
--- /dev/null
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/TestQueueConfiguration.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.storage.queue.sink;
+
+import com.azure.storage.queue.QueueServiceClient;
+import org.apache.camel.component.azure.storage.queue.QueueConfiguration;
+import org.apache.camel.kafkaconnector.azure.storage.services.AzureStorageClientUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestQueueConfiguration extends QueueConfiguration {
+ private static final Logger LOG = LoggerFactory.getLogger(TestQueueConfiguration.class);
+ private QueueServiceClient serviceClient;
+
+ @Override
+ public QueueServiceClient getServiceClient() {
+ LOG.info("Creating a custom QueueServiceClient");
+ if (serviceClient == null) {
+ serviceClient = AzureStorageClientUtils.getClient();
+ }
+
+ return serviceClient;
+ }
+}
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageClientUtils.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageClientUtils.java
new file mode 100644
index 0000000..5087c07
--- /dev/null
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageClientUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.storage.services;
+
+import com.azure.core.http.policy.HttpLogDetailLevel;
+import com.azure.core.http.policy.HttpLogOptions;
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.queue.QueueServiceClient;
+import com.azure.storage.queue.QueueServiceClientBuilder;
+import org.apache.camel.kafkaconnector.azure.storage.common.AzureConfigs;
+
+public final class AzureStorageClientUtils {
+
+ private AzureStorageClientUtils() {
+
+ }
+
+ public static QueueServiceClient getClient() {
+ String instanceType = System.getProperty("azure.instance.type");
+
+ String accountName = System.getProperty(AzureConfigs.ACCOUNT_NAME);
+ String accountKey = System.getProperty(AzureConfigs.ACCOUNT_KEY);
+ StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);
+
+ String host = System.getProperty(AzureConfigs.HOST);
+ String port = System.getProperty(AzureConfigs.PORT);
+
+ String endpoint;
+
+ if (instanceType == null || instanceType.equals("local-azure-container")) {
+ endpoint = String.format("http://%s:%s/%s", host, port, accountName);
+ } else {
+ if (host == null || host.isEmpty()) {
+ endpoint = String.format("https://%s.queue.core.windows.net/%s", accountName, accountKey);
+ } else {
+ endpoint = String.format("http://%s:%s/%s", host, port, accountName);
+ }
+ }
+
+ return new QueueServiceClientBuilder()
+ .endpoint(endpoint)
+ .credential(credential)
+ .httpLogOptions(new HttpLogOptions().setLogLevel(HttpLogDetailLevel.BODY_AND_HEADERS).setPrettyPrintBody(true))
+ .buildClient();
+ }
+}
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueLocalContainerService.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueLocalContainerService.java
new file mode 100644
index 0000000..6aeae34
--- /dev/null
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueLocalContainerService.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.storage.services;
+
+import com.azure.storage.queue.QueueServiceClient;
+import org.apache.camel.kafkaconnector.azure.common.AzureCredentialsHolder;
+import org.apache.camel.kafkaconnector.azure.common.services.AzureServices;
+import org.apache.camel.kafkaconnector.azure.common.services.AzureStorageService;
+import org.apache.camel.kafkaconnector.azure.storage.common.AzureConfigs;
+
+public class AzureStorageQueueLocalContainerService extends AzureStorageService<QueueServiceClient> {
+
+ @Override
+ public void initialize() {
+ super.initialize();
+
+ System.setProperty(AzureConfigs.ACCOUNT_NAME, getContainer().azureCredentials().accountName());
+ System.setProperty(AzureConfigs.ACCOUNT_KEY, getContainer().azureCredentials().accountKey());
+ System.setProperty(AzureConfigs.HOST, getContainer().getContainerIpAddress());
+ System.setProperty(AzureConfigs.PORT, String.valueOf(getContainer().getMappedPort(AzureServices.QUEUE_SERVICE)));
+ }
+
+ @Override
+ public AzureCredentialsHolder azureCredentials() {
+ return getContainer().azureCredentials();
+ }
+
+ @Override
+ public QueueServiceClient getClient() {
+ return AzureStorageClientUtils.getClient();
+ }
+}
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueRemoteService.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueRemoteService.java
new file mode 100644
index 0000000..12912fd
--- /dev/null
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueRemoteService.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.storage.services;
+
+import com.azure.storage.queue.QueueServiceClient;
+import org.apache.camel.kafkaconnector.azure.common.AzureCredentialsHolder;
+import org.apache.camel.kafkaconnector.azure.common.services.AzureService;
+import org.apache.camel.kafkaconnector.azure.storage.common.AzureConfigs;
+
+public class AzureStorageQueueRemoteService implements AzureService<QueueServiceClient> {
+
+ @Override
+ public void initialize() {
+ // NO-OP
+ }
+
+ @Override
+ public void shutdown() {
+ // NO-OP
+ }
+
+ @Override
+ public AzureCredentialsHolder azureCredentials() {
+ // Default credentials for Azurite
+ return new AzureCredentialsHolder() {
+ @Override
+ public String accountName() {
+ return System.getProperty(AzureConfigs.ACCOUNT_NAME);
+ }
+
+ @Override
+ public String accountKey() {
+ return System.getProperty(AzureConfigs.ACCOUNT_KEY);
+ }
+ };
+ }
+
+ @Override
+ public QueueServiceClient getClient() {
+ return AzureStorageClientUtils.getClient();
+ }
+}
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueServiceFactory.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueServiceFactory.java
new file mode 100644
index 0000000..86c321d
--- /dev/null
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageQueueServiceFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.storage.services;
+
+import com.azure.storage.queue.QueueServiceClient;
+import org.apache.camel.kafkaconnector.azure.common.services.AzureService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class AzureStorageQueueServiceFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(AzureStorageQueueServiceFactory.class);
+
+ private AzureStorageQueueServiceFactory() {
+
+ }
+
+ public static AzureService<QueueServiceClient> createAzureService() {
+ String instanceType = System.getProperty("azure.instance.type");
+
+ if (instanceType == null || instanceType.equals("local-azure-container")) {
+ return new AzureStorageQueueLocalContainerService();
+ }
+
+ if (instanceType.equals("remote")) {
+ return new AzureStorageQueueRemoteService();
+ }
+
+ LOG.error("Azure instance must be one of 'local-azure-container' or 'remote");
+ throw new UnsupportedOperationException(String.format("Invalid Azure instance type: %s", instanceType));
+ }
+}
diff --git a/tests/itests-common/src/test/resources/log4j2.properties b/tests/itests-common/src/test/resources/log4j2.properties
index 72b615e..aa48d0a 100644
--- a/tests/itests-common/src/test/resources/log4j2.properties
+++ b/tests/itests-common/src/test/resources/log4j2.properties
@@ -88,4 +88,10 @@ logger.aws1.appenderRef.file.ref = file
logger.aws2.name = software.amazon.awssdk
logger.aws2.level = INFO
logger.aws2.additivity = false
-logger.aws2.appenderRef.file.ref = file
\ No newline at end of file
+logger.aws2.appenderRef.file.ref = file
+
+# Azure
+logger.azure.name = com.azure
+logger.azure.level = trace
+logger.azure.additivity = false
+logger.azure.appenderRef.file.ref = file
diff --git a/tests/pom.xml b/tests/pom.xml
index 23855c9..52f63e7 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -56,6 +56,8 @@
<module>itests-hdfs</module>
<module>itests-mongodb</module>
<module>itests-jdbc</module>
+ <module>itests-azure-common</module>
+ <module>itests-azure-storage-queue</module>
</modules>