You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/03 17:24:42 UTC

[camel-kafka-connector] 09/18: Convert the HDFS tests to the new reusable sink test base class

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 40ac033448e4a051545670fd53a2fc10b715d23f
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 12:27:06 2021 +0100

    Convert the HDFS tests to the new reusable sink test base class
---
 .../hdfs/sink/CamelSinkHDFSITCase.java             | 95 ++++++++++++----------
 1 file changed, 54 insertions(+), 41 deletions(-)

diff --git a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
index 00234b5..c7e7cc3 100644
--- a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
+++ b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
@@ -19,10 +19,12 @@ package org.apache.camel.kafkaconnector.hdfs.sink;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.kafkaconnector.hdfs.utils.HDFSEasy;
 import org.apache.camel.test.infra.hdfs.v2.services.HDFSService;
@@ -38,13 +40,12 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkHDFSITCase extends AbstractKafkaTest {
+public class CamelSinkHDFSITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static HDFSService hdfsService = HDFSServiceFactory.createService();
 
@@ -52,6 +53,7 @@ public class CamelSinkHDFSITCase extends AbstractKafkaTest {
 
     private HDFSEasy hdfsEasy;
     private Path currentBasePath;
+    private String topicName;
 
     private final int expect = 10;
 
@@ -60,9 +62,9 @@ public class CamelSinkHDFSITCase extends AbstractKafkaTest {
         return new String[] {"camel-hdfs-kafka-connector"};
     }
 
-
     @BeforeEach
     public void setUp() throws IOException, URISyntaxException {
+        topicName = getTopicForTest(this);
         hdfsEasy = new HDFSEasy(hdfsService.getHDFSHost(), hdfsService.getPort());
 
         String currentPath = "/test" + TestUtils.randomWithRange(0, 256) + "/";
@@ -81,54 +83,51 @@ public class CamelSinkHDFSITCase extends AbstractKafkaTest {
         }
     }
 
-    private boolean filesCreated() {
-        return hdfsEasy.filesCreated(currentBasePath, expect);
+    @Override
+    protected String testMessageContent(int current) {
+        return "Sink test message: " + current;
     }
 
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
+    }
 
-    private String sendKafkaMessages(String baseMessage, int count) throws java.util.concurrent.ExecutionException, InterruptedException {
-        LOG.info("Sending data to Kafka");
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-
-        for (int i = 0; i < count; i++) {
-            kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), baseMessage + i);
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try {
+            TestUtils.waitFor(this::filesCreated);
+        } finally {
+            latch.countDown();
         }
-        return baseMessage;
     }
 
-    @Test
-    @Timeout(90)
-    public void testBasicSendReceive() {
-        try {
-            ConnectorPropertyFactory connectorPropertyFactory = CamelHDFSPropertyFactory
-                    .basic()
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
-                    .withHostname(hdfsService.getHDFSHost())
-                    .withPort(hdfsService.getPort())
-                    .withPath(currentBasePath.getName())
-                    .withSplitStrategy("MESSAGES:1,IDLE:1000");
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(30, TimeUnit.SECONDS)) {
+            boolean filesCreated = filesCreated();
+            assertTrue(filesCreated, "The files were not created on the remote host");
 
-            connectorPropertyFactory.log();
-            getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+            try {
+                assertEquals(hdfsEasy.countFiles(currentBasePath), expect, "The number of files created vs expected do not match");
 
-            final String baseMessage = "Sink test message: ";
-            sendKafkaMessages(baseMessage, expect);
+                final String baseMessage = "Sink test message: ";
+                hdfsEasy.listFiles(currentBasePath)
+                        .stream()
+                        .filter(f -> !f.getPath().getName().contains(".opened"))
+                        .forEach(f -> printFile(f, baseMessage));
+            } catch (IOException e) {
+                fail(e.getMessage());
+            }
 
-            boolean filesCreated = TestUtils.waitFor(this::filesCreated);
-            assertTrue(filesCreated, "The files were not created on the remote host");
-            assertEquals(hdfsEasy.countFiles(currentBasePath), expect, "The number of files created vs expected do not match");
-            hdfsEasy.listFiles(currentBasePath)
-                    .stream()
-                    .filter(f -> !f.getPath().getName().contains(".opened"))
-                    .forEach(f -> printFile(f, baseMessage));
-
-        } catch (Exception e) {
-            LOG.error("HDFS test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
+        } else {
+            fail("Failed to receive the messages within the specified time");
         }
     }
 
-
+    private boolean filesCreated() {
+        return hdfsEasy.filesCreated(currentBasePath, expect);
+    }
 
     private void printFile(LocatedFileStatus f, String matchString) {
         try {
@@ -142,4 +141,18 @@ public class CamelSinkHDFSITCase extends AbstractKafkaTest {
             fail("I/O error: " + e.getMessage());
         }
     }
+
+    @Test
+    @Timeout(90)
+    public void testBasicSendReceive() throws Exception {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelHDFSPropertyFactory
+                .basic()
+                .withTopics(topicName)
+                .withHostname(hdfsService.getHDFSHost())
+                .withPort(hdfsService.getPort())
+                .withPath(currentBasePath.getName())
+                .withSplitStrategy("MESSAGES:1,IDLE:1000");
+
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
 }