You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/04 14:13:01 UTC

[camel-kafka-connector] 18/22: Convert the Syslog tests to the new reusable sink test base class

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit ad4a4f8b12765b316b0018e10e269b0154055db9
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 15:59:57 2021 +0100

    Convert the Syslog tests to the new reusable sink test base class
---
 .../syslog/sink/CamelSinkSyslogITCase.java         | 71 ++++++++++++----------
 1 file changed, 39 insertions(+), 32 deletions(-)

diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
index 9273964..1b9f942 100644
--- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
@@ -16,19 +16,19 @@
  */
 package org.apache.camel.kafkaconnector.syslog.sink;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.kafkaconnector.syslog.services.SyslogService;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -39,15 +39,14 @@ import static org.junit.jupiter.api.Assertions.fail;
  * messages
  */
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkSyslogITCase extends AbstractKafkaTest {
+public class CamelSinkSyslogITCase extends CamelSinkTestSupport {
     private static final int FREE_PORT = NetworkUtils.getFreePort("localhost", NetworkUtils.Protocol.UDP);
+    private static final String TEST_TXT = "<13>1 2020-05-14T14:47:01.198+02:00 nathannever myapp - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!";
 
     @RegisterExtension
     public static SyslogService syslogService = new SyslogService("udp", "//localhost", FREE_PORT);
 
-    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkSyslogITCase.class);
-
-    private int received;
+    private String topicName;
     private final int expect = 1;
 
     @Override
@@ -57,36 +56,44 @@ public class CamelSinkSyslogITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
-        received = 0;
+        topicName = getTopicForTest(this);
     }
 
-    private void runBasicProduceTest(ConnectorPropertyFactory connectorPropertyFactory) throws Exception {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+    @Override
+    protected String testMessageContent(int current) {
+        return TEST_TXT;
+    }
+
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
+    }
 
-        LOG.debug("Creating the producer ...");
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "<13>1 2020-05-14T14:47:01.198+02:00 nathannever myapp - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!");
-        LOG.debug("Created the producer ...");
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        latch.countDown();
+    }
 
-        assertEquals("<13>1 2020-05-14T14:47:01.198+02:00 nathannever myapp - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!", syslogService.getFirstExchangeToBeReceived().getIn().getBody(String.class));
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(30, TimeUnit.SECONDS)) {
+            assertEquals(TEST_TXT, syslogService.getFirstExchangeToBeReceived().getIn().getBody(String.class));
+        } else {
+            fail("Timed out wait for data to be added to the Kafka cluster");
+        }
     }
 
+
     @Test
     @Timeout(90)
-    public void testBasicReceive() {
-        try {
-            ConnectorPropertyFactory connectorPropertyFactory = CamelSyslogPropertyFactory
-                    .basic()
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
-                    .withHost("localhost")
-                    .withPort(FREE_PORT)
-                    .withProtocol("udp");
-
-            runBasicProduceTest(connectorPropertyFactory);
-        } catch (Exception e) {
-            LOG.error("Syslog test failed: {} {}", e.getMessage(), e);
-            fail(e.getMessage(), e);
-        }
+    public void testBasicReceive() throws Exception {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelSyslogPropertyFactory
+                .basic()
+                .withTopics(topicName)
+                .withHost("localhost")
+                .withPort(FREE_PORT)
+                .withProtocol("udp");
+
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 }