You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2020/11/02 14:38:57 UTC

[camel-kafka-connector] branch master updated: Update azure-tests

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 338567c  Update azure-tests
338567c is described below

commit 338567cb921e3e8fe9874bc921e86ee7478ab73e
Author: Mikhail Abramov <ma...@redhat.com>
AuthorDate: Mon Nov 2 14:53:36 2020 +0100

    Update azure-tests
---
 .../blob/sink/CamelSinkAzureStorageBlobITCase.java | 39 ++++++++++++++++------
 .../services/AzureStorageBlobClientUtils.java      |  2 +-
 .../sink/CamelSinkAzureStorageQueueITCase.java     |  2 --
 3 files changed, 30 insertions(+), 13 deletions(-)

diff --git a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
index 33e3c42..6066e71 100644
--- a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
+++ b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
@@ -17,11 +17,13 @@
 
 package org.apache.camel.kafkaconnector.azure.storage.blob.sink;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
+import com.azure.storage.blob.BlobClient;
 import com.azure.storage.blob.BlobContainerClient;
 import com.azure.storage.blob.BlobServiceClient;
 import com.azure.storage.blob.models.BlobItem;
@@ -33,8 +35,8 @@ import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -42,7 +44,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class CamelSinkAzureStorageBlobITCase extends AbstractKafkaTest {
     @RegisterExtension
@@ -69,26 +70,45 @@ public class CamelSinkAzureStorageBlobITCase extends AbstractKafkaTest {
 
         blobContainerName = "test-" +  TestUtils.randomWithRange(1, 100);
         blobContainerClient = client.createBlobContainer(blobContainerName);
-        received = 0;
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (client != null) {
+            client.deleteBlobContainer(blobContainerName);
+        }
     }
 
     private boolean canConsume() {
         return blobContainerClient.exists() && blobContainerClient.listBlobs().stream().count() > 0;
     }
 
+
     private void consume() {
         LOG.debug("Created the consumer ...");
         TestUtils.waitFor(this::canConsume);
 
-        for (BlobItem blobContainerItem : blobContainerClient.listBlobs()) {
-            String receivedFile = blobContainerItem.getName();
-            assertTrue(sentData.containsKey(receivedFile));
+        int retries = 10;
+        do {
+            received = 0;
+            for (BlobItem blobContainerItem : blobContainerClient.listBlobs()) {
+                String receivedFile = blobContainerItem.getName();
+                BlobClient blobClient = blobContainerClient.getBlobClient(receivedFile);
 
-            // TODO: check the file contents in the future
-            received++;
-        }
+                ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+                blobClient.download(outputStream);
+                String contentFile = outputStream.toString();
+
+                LOG.info("Received: \'{}\' with content: \'{}\'", receivedFile, contentFile);
+                assertEquals(sentData.get(receivedFile), contentFile, "Did not receive the same message that was sent");
+
+                received++;
+            }
+            retries--;
+        } while (received != 10 && retries > 0);
     }
 
+
     private void putRecords() {
         Map<String, String> messageParameters = new HashMap<>();
         KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
@@ -122,7 +142,6 @@ public class CamelSinkAzureStorageBlobITCase extends AbstractKafkaTest {
         assertEquals(expect, received, "Did not receive the same amount of messages that were sent");
     }
 
-    @Disabled("Disabled due to issue #409")
     @Test
     @Timeout(90)
     public void testBasicSendReceive() throws InterruptedException, ExecutionException, IOException {
diff --git a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobClientUtils.java b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobClientUtils.java
index 61f42fa..6333a2d 100644
--- a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobClientUtils.java
+++ b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/services/AzureStorageBlobClientUtils.java
@@ -46,7 +46,7 @@ public final class AzureStorageBlobClientUtils {
             endpoint = String.format("http://%s:%s/%s", host, port, accountName);
         } else {
             if (host == null || host.isEmpty()) {
-                endpoint = String.format("https://%s.queue.core.windows.net/%s", accountName, accountKey);
+                endpoint = String.format("https://%s.blob.core.windows.net/%s", accountName, accountKey);
             } else {
                 endpoint = String.format("http://%s:%s/%s", host, port, accountName);
             }
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
index d9c0e60..87874bc 100644
--- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
@@ -35,7 +35,6 @@ import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -128,7 +127,6 @@ public class CamelSinkAzureStorageQueueITCase extends AbstractKafkaTest {
     }
 
 
-    @Disabled("Disabled due to issue #409")
     @Test
     @Timeout(90)
     public void testBasicSendReceive() throws InterruptedException, ExecutionException, IOException {