You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/04 14:12:51 UTC

[camel-kafka-connector] 08/22: Convert the File tests to the new reusable sink test base class

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 6cdd1de1ff135a569a5b913d77f4932fa32afb86
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 12:15:23 2021 +0100

    Convert the File tests to the new reusable sink test base class
---
 .../file/sink/CamelSinkFileITCase.java             | 122 ++++++++++-----------
 1 file changed, 60 insertions(+), 62 deletions(-)

diff --git a/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java b/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java
index 2dbf459..ead6c58 100644
--- a/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java
+++ b/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java
@@ -27,13 +27,12 @@ import java.nio.file.StandardWatchEventKinds;
 import java.nio.file.WatchEvent;
 import java.nio.file.WatchKey;
 import java.nio.file.WatchService;
-import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -43,18 +42,19 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 @Testcontainers
-public class CamelSinkFileITCase extends AbstractKafkaTest {
+public class CamelSinkFileITCase extends CamelSinkTestSupport {
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkFileITCase.class);
 
     private static final String SINK_DIR = CamelSinkFileITCase.class.getResource(".").getPath();
     private static final String FILENAME = "test.txt";
 
+    private String topicName;
     private final int expect = 1;
 
     @Override
@@ -64,6 +64,7 @@ public class CamelSinkFileITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
+        topicName = getTopicForTest(this);
         cleanup();
     }
 
@@ -79,74 +80,46 @@ public class CamelSinkFileITCase extends AbstractKafkaTest {
         }
     }
 
-    private void putRecords() {
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-
-        for (int i = 0; i < expect; i++) {
-            try {
-                kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "test");
-            } catch (ExecutionException e) {
-                LOG.error("Unable to produce messages: {}", e.getMessage(), e);
-            } catch (InterruptedException e) {
-                break;
-            }
-        }
+    @Override
+    protected String testMessageContent(int current) {
+        return "test";
     }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException, IOException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
-        putRecords();
-
-        LOG.debug("Created the consumer ... About to receive messages");
-
-        File sinkFile = new File(SINK_DIR, FILENAME);
-        File doneFile = new File(SINK_DIR, FILENAME + ".done");
-
-        waitForFile(sinkFile, doneFile);
-
-        assertTrue(sinkFile.exists(), String.format("The file %s does not exist", sinkFile.getPath()));
-
-        checkFileContents(sinkFile);
-
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
     }
 
-    @Test
-    @Timeout(90)
-    public void testBasicSendReceive() {
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
         try {
-            ConnectorPropertyFactory connectorPropertyFactory = CamelFilePropertyFactory.basic()
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
-                    .withDirectoryName(SINK_DIR)
-                    .withFileName(FILENAME)
-                    .withDoneFileName(FILENAME + ".done");
+            File sinkFile = new File(SINK_DIR, FILENAME);
+            File doneFile = new File(SINK_DIR, FILENAME + ".done");
 
-            runTest(connectorPropertyFactory);
-
-        } catch (Exception e) {
-            LOG.error("HTTP test failed: {}", e.getMessage(), e);
+            waitForFile(sinkFile, doneFile);
+        } catch (InterruptedException e) {
+            fail(e.getMessage());
+        } catch (IOException e) {
             fail(e.getMessage());
+        } finally {
+            latch.countDown();
         }
     }
 
-    @Test
-    @Timeout(90)
-    public void testBasicSendReceiveUsingUrl() {
-        try {
-            ConnectorPropertyFactory connectorPropertyFactory = CamelFilePropertyFactory.basic()
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
-                    .withUrl(SINK_DIR)
-                        .append("fileName", FILENAME)
-                        .append("doneFileName", FILENAME + ".done")
-                        .buildUrl();
-
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(30, TimeUnit.SECONDS)) {
+            File sinkFile = new File(SINK_DIR, FILENAME);
 
-            runTest(connectorPropertyFactory);
+            assertTrue(sinkFile.exists(), String.format("The file %s does not exist", sinkFile.getPath()));
 
-        } catch (Exception e) {
-            LOG.error("HTTP test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
+            try {
+                checkFileContents(sinkFile);
+            } catch (IOException e) {
+                fail(e.getMessage());
+            }
+        } else {
+            fail("Failed to receive the messages within the specified time");
         }
     }
 
@@ -212,4 +185,29 @@ public class CamelSinkFileITCase extends AbstractKafkaTest {
             retries--;
         } while (!doneFile.exists() && retries > 0);
     }
+
+    @Test
+    @Timeout(90)
+    public void testBasicSendReceive() throws Exception {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelFilePropertyFactory.basic()
+                .withTopics(topicName)
+                .withDirectoryName(SINK_DIR)
+                .withFileName(FILENAME)
+                .withDoneFileName(FILENAME + ".done");
+
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
+
+    @Test
+    @Timeout(90)
+    public void testBasicSendReceiveUsingUrl() throws Exception {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelFilePropertyFactory.basic()
+                .withTopics(topicName)
+                .withUrl(SINK_DIR)
+                .append("fileName", FILENAME)
+                .append("doneFileName", FILENAME + ".done")
+                .buildUrl();
+
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
 }