You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/04 14:12:46 UTC

[camel-kafka-connector] 03/22: Convert the Azure storage blob tests to the new reusable sink test base class

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit e84c8c902e75a307ef766b3508ebe85c1fd70817
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 11:16:48 2021 +0100

    Convert the Azure storage blob tests to the new reusable sink test base class
---
 .../blob/sink/CamelSinkAzureStorageBlobITCase.java | 98 +++++++++++-----------
 .../common/test/CamelSinkTestSupport.java          |  6 +-
 2 files changed, 56 insertions(+), 48 deletions(-)

diff --git a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
index 2b46470..1bbf9f1 100644
--- a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
+++ b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
@@ -18,19 +18,18 @@
 package org.apache.camel.kafkaconnector.azure.storage.blob.sink;
 
 import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import com.azure.storage.blob.BlobClient;
 import com.azure.storage.blob.BlobContainerClient;
 import com.azure.storage.blob.BlobServiceClient;
 import com.azure.storage.blob.models.BlobItem;
 import org.apache.camel.kafkaconnector.CamelSinkTask;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.azure.common.AzureCredentialsHolder;
 import org.apache.camel.test.infra.azure.common.services.AzureService;
@@ -46,18 +45,19 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkAzureStorageBlobITCase extends AbstractKafkaTest {
+public class CamelSinkAzureStorageBlobITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static AzureService service = AzureStorageBlobServiceFactory.createAzureService();
-
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAzureStorageBlobITCase.class);
 
     private BlobServiceClient client;
     private BlobContainerClient blobContainerClient;
     private String blobContainerName;
     private Map<String, String> sentData = new HashMap<>();
+    private String topicName;
 
     private int expect = 10;
     private int received;
@@ -69,6 +69,7 @@ public class CamelSinkAzureStorageBlobITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUpBlob() {
+        topicName = getTopicForTest(this);
         client = AzureStorageBlobClientUtils.getClient();
 
         blobContainerName = "test-" +  TestUtils.randomWithRange(1, 100);
@@ -82,8 +83,45 @@ public class CamelSinkAzureStorageBlobITCase extends AbstractKafkaTest {
         }
     }
 
+    @Override
+    protected String testMessageContent(int current) {
+        return "test " + current + " data";
+    }
+
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        Map<String, String> messageParameters = new HashMap<>();
+
+        String sentFile = "test " + current;
+
+        sentData.put(sentFile, testMessageContent(current));
+
+        messageParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAzureStorageBlobBlobName", sentFile);
+
+        return messageParameters;
+    }
+
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try {
+            consume();
+        } finally {
+            latch.countDown();
+        }
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(120, TimeUnit.SECONDS)) {
+            assertEquals(expect, received,
+                    "Didn't process the expected amount of messages: " + received + " != " + expect);
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
+    }
+
     private boolean canConsume() {
-        return blobContainerClient.exists() && blobContainerClient.listBlobs().stream().count() > 0;
+        return blobContainerClient.exists() && blobContainerClient.listBlobs().stream().count() >= expect;
     }
 
 
@@ -111,71 +149,37 @@ public class CamelSinkAzureStorageBlobITCase extends AbstractKafkaTest {
         } while (received != 10 && retries > 0);
     }
 
-
-    private void putRecords() {
-        Map<String, String> messageParameters = new HashMap<>();
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-
-        for (int i = 0; i < expect; i++) {
-            try {
-                String sentFile = "test " + i;
-                String sentText = "test " + i + " data";
-
-                sentData.put(sentFile, sentText);
-
-                messageParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAzureStorageBlobBlobName", sentFile);
-
-                kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), sentText, messageParameters);
-            } catch (ExecutionException e) {
-                LOG.error("Unable to produce messages: {}", e.getMessage(), e);
-            } catch (InterruptedException e) {
-                break;
-            }
-        }
-    }
-
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
-
-        putRecords();
-
-        consume();
-
-        assertEquals(expect, received, "Did not receive the same amount of messages that were sent");
-    }
-
     @Test
     @Timeout(90)
-    public void testBasicSendReceive() throws InterruptedException, ExecutionException, IOException {
+    public void testBasicSendReceive() throws Exception {
         AzureCredentialsHolder azureCredentialsHolder = service.azureCredentials();
 
         ConnectorPropertyFactory factory = CamelSinkAzureStorageBlobPropertyFactory
                 .basic()
-                .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withTopics(topicName)
                 .withConfiguration(TestBlobConfiguration.class.getName())
                 .withAccessKey(azureCredentialsHolder.accountKey())
                 .withAccountName(azureCredentialsHolder.accountName())
                 .withContainerName(blobContainerName)
                 .withOperation("uploadBlockBlob");
 
-        runTest(factory);
+        runTest(factory, topicName, expect);
     }
 
     @Test
     @Timeout(90)
-    public void testBasicSendReceiveUrl() throws InterruptedException, ExecutionException, IOException {
+    public void testBasicSendReceiveUrl() throws Exception {
         AzureCredentialsHolder azureCredentialsHolder = service.azureCredentials();
 
         ConnectorPropertyFactory factory = CamelSinkAzureStorageBlobPropertyFactory
                 .basic()
-                .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withTopics(topicName)
                 .withConfiguration(TestBlobConfiguration.class.getName())
                 .withUrl(azureCredentialsHolder.accountName() + "/" + blobContainerName)
                     .append("accessKey", azureCredentialsHolder.accountKey())
                     .append("operation", "uploadBlockBlob")
                     .buildUrl();
 
-        runTest(factory);
+        runTest(factory, topicName, expect);
     }
 }
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
index d70c3d4..9f8460f 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
@@ -36,12 +36,16 @@ public abstract class CamelSinkTestSupport extends AbstractKafkaTest {
 
     protected abstract Map<String, String> messageHeaders(String text, int current);
 
+    protected String testMessageContent(int current) {
+        return  "Sink test message " + current;
+    }
+
     protected void produceMessages(String topicName, int count)  {
         try {
             KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
 
             for (int i = 0; i < count; i++) {
-                String message = "Sink test message " + i;
+                String message = testMessageContent(i);
                 Map<String, String> headers = messageHeaders(message, i);
 
                 if (headers == null) {