You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2020/11/02 14:38:57 UTC
[camel-kafka-connector] branch master updated: Update azure-tests
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 338567c Update azure-tests
338567c is described below
commit 338567cb921e3e8fe9874bc921e86ee7478ab73e
Author: Mikhail Abramov <ma...@redhat.com>
AuthorDate: Mon Nov 2 14:53:36 2020 +0100
Update azure-tests
---
.../blob/sink/CamelSinkAzureStorageBlobITCase.java | 39 ++++++++++++++++------
.../services/AzureStorageBlobClientUtils.java | 2 +-
.../sink/CamelSinkAzureStorageQueueITCase.java | 2 --
3 files changed, 30 insertions(+), 13 deletions(-)
diff --git a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
index 33e3c42..6066e71 100644
--- a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
+++ b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
@@ -17,11 +17,13 @@
package org.apache.camel.kafkaconnector.azure.storage.blob.sink;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
+import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.models.BlobItem;
@@ -33,8 +35,8 @@ import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -42,7 +44,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
public class CamelSinkAzureStorageBlobITCase extends AbstractKafkaTest {
@RegisterExtension
@@ -69,26 +70,45 @@ public class CamelSinkAzureStorageBlobITCase extends AbstractKafkaTest {
blobContainerName = "test-" + TestUtils.randomWithRange(1, 100);
blobContainerClient = client.createBlobContainer(blobContainerName);
- received = 0;
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (client != null) {
+ client.deleteBlobContainer(blobContainerName);
+ }
}
private boolean canConsume() {
return blobContainerClient.exists() && blobContainerClient.listBlobs().stream().count() > 0;
}
+
private void consume() {
LOG.debug("Created the consumer ...");
TestUtils.waitFor(this::canConsume);
- for (BlobItem blobContainerItem : blobContainerClient.listBlobs()) {
- String receivedFile = blobContainerItem.getName();
- assertTrue(sentData.containsKey(receivedFile));
+ int retries = 10;
+ do {
+ received = 0;
+ for (BlobItem blobContainerItem : blobContainerClient.listBlobs()) {
+ String receivedFile = blobContainerItem.getName();
+ BlobClient blobClient = blobContainerClient.getBlobClient(receivedFile);
- // TODO: check the file contents in the future
- received++;
- }
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ blobClient.download(outputStream);
+ String contentFile = outputStream.toString();
+
+ LOG.info("Received: \'{}\' with content: \'{}\'", receivedFile, contentFile);
+ assertEquals(sentData.get(receivedFile), contentFile, "Did not receive the same message that was sent");
+
+ received++;
+ }
+ retries--;
+ } while (received != 10 && retries > 0);
}
+
private void putRecords() {
Map<String, String> messageParameters = new HashMap<>();
KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
@@ -122,7 +142,6 @@ public class CamelSinkAzureStorageBlobITCase extends AbstractKafkaTest {
assertEquals(expect, received, "Did not receive the same amount of messages that were sent");
}
- @Disabled("Disabled due to issue #409")
@Test
@Timeout(90)
public void testBasicSendReceive() throws InterruptedException, ExecutionException, IOException {
diff --git a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobClientUtils.java b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobClientUtils.java
index 61f42fa..6333a2d 100644
--- a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobClientUtils.java
+++ b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobClientUtils.java
@@ -46,7 +46,7 @@ public final class AzureStorageBlobClientUtils {
endpoint = String.format("http://%s:%s/%s", host, port, accountName);
} else {
if (host == null || host.isEmpty()) {
- endpoint = String.format("https://%s.queue.core.windows.net/%s", accountName, accountKey);
+ endpoint = String.format("https://%s.blob.core.windows.net/%s", accountName, accountKey);
} else {
endpoint = String.format("http://%s:%s/%s", host, port, accountName);
}
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
index d9c0e60..87874bc 100644
--- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
@@ -35,7 +35,6 @@ import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -128,7 +127,6 @@ public class CamelSinkAzureStorageQueueITCase extends AbstractKafkaTest {
}
- @Disabled("Disabled due to issue #409")
@Test
@Timeout(90)
public void testBasicSendReceive() throws InterruptedException, ExecutionException, IOException {