You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/04 14:12:52 UTC
[camel-kafka-connector] 09/22: Convert the HDFS tests to the new
reusable sink test base class
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 23310487ad3ca5fa60d7885afb41eb28ae5f79de
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 12:27:06 2021 +0100
Convert the HDFS tests to the new reusable sink test base class
---
.../hdfs/sink/CamelSinkHDFSITCase.java | 95 ++++++++++++----------
1 file changed, 54 insertions(+), 41 deletions(-)
diff --git a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
index 00234b5..c7e7cc3 100644
--- a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
+++ b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
@@ -19,10 +19,12 @@ package org.apache.camel.kafkaconnector.hdfs.sink;
import java.io.IOException;
import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.kafkaconnector.hdfs.utils.HDFSEasy;
import org.apache.camel.test.infra.hdfs.v2.services.HDFSService;
@@ -38,13 +40,12 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkHDFSITCase extends AbstractKafkaTest {
+public class CamelSinkHDFSITCase extends CamelSinkTestSupport {
@RegisterExtension
public static HDFSService hdfsService = HDFSServiceFactory.createService();
@@ -52,6 +53,7 @@ public class CamelSinkHDFSITCase extends AbstractKafkaTest {
private HDFSEasy hdfsEasy;
private Path currentBasePath;
+ private String topicName;
private final int expect = 10;
@@ -60,9 +62,9 @@ public class CamelSinkHDFSITCase extends AbstractKafkaTest {
return new String[] {"camel-hdfs-kafka-connector"};
}
-
@BeforeEach
public void setUp() throws IOException, URISyntaxException {
+ topicName = getTopicForTest(this);
hdfsEasy = new HDFSEasy(hdfsService.getHDFSHost(), hdfsService.getPort());
String currentPath = "/test" + TestUtils.randomWithRange(0, 256) + "/";
@@ -81,54 +83,51 @@ public class CamelSinkHDFSITCase extends AbstractKafkaTest {
}
}
- private boolean filesCreated() {
- return hdfsEasy.filesCreated(currentBasePath, expect);
+ @Override
+ protected String testMessageContent(int current) {
+ return "Sink test message: " + current;
}
+ @Override
+ protected Map<String, String> messageHeaders(String text, int current) {
+ return null;
+ }
- private String sendKafkaMessages(String baseMessage, int count) throws java.util.concurrent.ExecutionException, InterruptedException {
- LOG.info("Sending data to Kafka");
- KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-
- for (int i = 0; i < count; i++) {
- kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), baseMessage + i);
+ @Override
+ protected void consumeMessages(CountDownLatch latch) {
+ try {
+ TestUtils.waitFor(this::filesCreated);
+ } finally {
+ latch.countDown();
}
- return baseMessage;
}
- @Test
- @Timeout(90)
- public void testBasicSendReceive() {
- try {
- ConnectorPropertyFactory connectorPropertyFactory = CamelHDFSPropertyFactory
- .basic()
- .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
- .withHostname(hdfsService.getHDFSHost())
- .withPort(hdfsService.getPort())
- .withPath(currentBasePath.getName())
- .withSplitStrategy("MESSAGES:1,IDLE:1000");
+ @Override
+ protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+ if (latch.await(30, TimeUnit.SECONDS)) {
+ boolean filesCreated = filesCreated();
+ assertTrue(filesCreated, "The files were not created on the remote host");
- connectorPropertyFactory.log();
- getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+ try {
+ assertEquals(hdfsEasy.countFiles(currentBasePath), expect, "The number of files created vs expected do not match");
- final String baseMessage = "Sink test message: ";
- sendKafkaMessages(baseMessage, expect);
+ final String baseMessage = "Sink test message: ";
+ hdfsEasy.listFiles(currentBasePath)
+ .stream()
+ .filter(f -> !f.getPath().getName().contains(".opened"))
+ .forEach(f -> printFile(f, baseMessage));
+ } catch (IOException e) {
+ fail(e.getMessage());
+ }
- boolean filesCreated = TestUtils.waitFor(this::filesCreated);
- assertTrue(filesCreated, "The files were not created on the remote host");
- assertEquals(hdfsEasy.countFiles(currentBasePath), expect, "The number of files created vs expected do not match");
- hdfsEasy.listFiles(currentBasePath)
- .stream()
- .filter(f -> !f.getPath().getName().contains(".opened"))
- .forEach(f -> printFile(f, baseMessage));
-
- } catch (Exception e) {
- LOG.error("HDFS test failed: {}", e.getMessage(), e);
- fail(e.getMessage());
+ } else {
+ fail("Failed to receive the messages within the specified time");
}
}
-
+ private boolean filesCreated() {
+ return hdfsEasy.filesCreated(currentBasePath, expect);
+ }
private void printFile(LocatedFileStatus f, String matchString) {
try {
@@ -142,4 +141,18 @@ public class CamelSinkHDFSITCase extends AbstractKafkaTest {
fail("I/O error: " + e.getMessage());
}
}
+
+ @Test
+ @Timeout(90)
+ public void testBasicSendReceive() throws Exception {
+ ConnectorPropertyFactory connectorPropertyFactory = CamelHDFSPropertyFactory
+ .basic()
+ .withTopics(topicName)
+ .withHostname(hdfsService.getHDFSHost())
+ .withPort(hdfsService.getPort())
+ .withPath(currentBasePath.getName())
+ .withSplitStrategy("MESSAGES:1,IDLE:1000");
+
+ runTest(connectorPropertyFactory, topicName, expect);
+ }
}