You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/03 17:24:51 UTC
[camel-kafka-connector] 18/18: Convert the Syslog tests to the new
reusable sink test base class
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit b6a1bb9320d57b6d3408f2d4e17c426c71bd03b3
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 15:59:57 2021 +0100
Convert the Syslog tests to the new reusable sink test base class
---
.../syslog/sink/CamelSinkSyslogITCase.java | 71 ++++++++++++----------
1 file changed, 39 insertions(+), 32 deletions(-)
diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
index 9273964..1b9f942 100644
--- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
@@ -16,19 +16,19 @@
*/
package org.apache.camel.kafkaconnector.syslog.sink;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.kafkaconnector.syslog.services.SyslogService;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
@@ -39,15 +39,14 @@ import static org.junit.jupiter.api.Assertions.fail;
* messages
*/
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkSyslogITCase extends AbstractKafkaTest {
+public class CamelSinkSyslogITCase extends CamelSinkTestSupport {
private static final int FREE_PORT = NetworkUtils.getFreePort("localhost", NetworkUtils.Protocol.UDP);
+ private static final String TEST_TXT = "<13>1 2020-05-14T14:47:01.198+02:00 nathannever myapp - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!";
@RegisterExtension
public static SyslogService syslogService = new SyslogService("udp", "//localhost", FREE_PORT);
- private static final Logger LOG = LoggerFactory.getLogger(CamelSinkSyslogITCase.class);
-
- private int received;
+ private String topicName;
private final int expect = 1;
@Override
@@ -57,36 +56,44 @@ public class CamelSinkSyslogITCase extends AbstractKafkaTest {
@BeforeEach
public void setUp() {
- received = 0;
+ topicName = getTopicForTest(this);
}
- private void runBasicProduceTest(ConnectorPropertyFactory connectorPropertyFactory) throws Exception {
- connectorPropertyFactory.log();
- getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+ @Override
+ protected String testMessageContent(int current) {
+ return TEST_TXT;
+ }
+
+ @Override
+ protected Map<String, String> messageHeaders(String text, int current) {
+ return null;
+ }
- LOG.debug("Creating the producer ...");
- KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
- kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "<13>1 2020-05-14T14:47:01.198+02:00 nathannever myapp - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!");
- LOG.debug("Created the producer ...");
+ @Override
+ protected void consumeMessages(CountDownLatch latch) {
+ latch.countDown();
+ }
- assertEquals("<13>1 2020-05-14T14:47:01.198+02:00 nathannever myapp - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!", syslogService.getFirstExchangeToBeReceived().getIn().getBody(String.class));
+ @Override
+ protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+ if (latch.await(30, TimeUnit.SECONDS)) {
+ assertEquals(TEST_TXT, syslogService.getFirstExchangeToBeReceived().getIn().getBody(String.class));
+ } else {
+ fail("Timed out wait for data to be added to the Kafka cluster");
+ }
}
+
@Test
@Timeout(90)
- public void testBasicReceive() {
- try {
- ConnectorPropertyFactory connectorPropertyFactory = CamelSyslogPropertyFactory
- .basic()
- .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
- .withHost("localhost")
- .withPort(FREE_PORT)
- .withProtocol("udp");
-
- runBasicProduceTest(connectorPropertyFactory);
- } catch (Exception e) {
- LOG.error("Syslog test failed: {} {}", e.getMessage(), e);
- fail(e.getMessage(), e);
- }
+ public void testBasicReceive() throws Exception {
+ ConnectorPropertyFactory connectorPropertyFactory = CamelSyslogPropertyFactory
+ .basic()
+ .withTopics(topicName)
+ .withHost("localhost")
+ .withPort(FREE_PORT)
+ .withProtocol("udp");
+
+ runTest(connectorPropertyFactory, topicName, expect);
}
}