You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/15 10:39:05 UTC
[camel-kafka-connector] 04/10: Rework the syslog integration tests
to let JUnit handle camel's lifecycle in the test
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit dd5ce04329c4601ae32027971335d0f051c27ed3
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Fri Feb 12 16:11:18 2021 +0100
Rework the syslog integration tests to let JUnit handle camel's lifecycle in the test
This fixes a few false negatives related to start/stop the CamelContext in the test
---
.../syslog/services/RouteConfigurator.java | 24 ++++++++++
...slogService.java => SinkRouteConfigurator.java} | 41 +++++++---------
.../syslog/services/SourceRouteConfigurator.java | 54 ++++++++++++++++++++++
.../syslog/services/SyslogService.java | 49 +++++++++++---------
.../syslog/sink/CamelSinkSyslogITCase.java | 19 ++++++--
.../syslog/source/CamelSourceSyslogITCase.java | 47 +++----------------
6 files changed, 145 insertions(+), 89 deletions(-)
diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/RouteConfigurator.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/RouteConfigurator.java
new file mode 100644
index 0000000..4b20f72
--- /dev/null
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/RouteConfigurator.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.syslog.services;
+
+import org.apache.camel.CamelContext;
+
+public interface RouteConfigurator {
+ void configure(CamelContext camelContext) throws Exception;
+}
diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SyslogService.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SinkRouteConfigurator.java
similarity index 51%
copy from tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SyslogService.java
copy to tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SinkRouteConfigurator.java
index 3bc07b3..83f1a30 100644
--- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SyslogService.java
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SinkRouteConfigurator.java
@@ -14,49 +14,40 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.camel.kafkaconnector.syslog.services;
import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.syslog.SyslogDataFormat;
import org.apache.camel.component.syslog.netty.Rfc5425FrameDecoder;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.junit.jupiter.api.extension.AfterAllCallback;
-import org.junit.jupiter.api.extension.BeforeAllCallback;
-import org.junit.jupiter.api.extension.ExtensionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class SyslogService implements BeforeAllCallback, AfterAllCallback {
- private static final CamelContext CAMEL_CONTEXT = new DefaultCamelContext();
+public class SinkRouteConfigurator implements RouteConfigurator {
+ private static final Logger LOG = LoggerFactory.getLogger(SinkRouteConfigurator.class);
- private static String protocol;
- private static String host;
- private static int port;
+ private final String protocol;
+ private final String host;
+ private final int port;
- public SyslogService(String protocol, String host, int port) {
+ public SinkRouteConfigurator(String protocol, String host, int port) {
this.protocol = protocol;
this.host = host;
this.port = port;
}
@Override
- public void beforeAll(ExtensionContext context) throws Exception {
- CAMEL_CONTEXT.getRegistry().bind("decoder", new Rfc5425FrameDecoder());
- CAMEL_CONTEXT.addRoutes(new RouteBuilder() {
+ public void configure(CamelContext camelContext) throws Exception {
+ camelContext.getRegistry().bind("decoder", new Rfc5425FrameDecoder());
+
+ LOG.debug("Adding routes");
+ camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() {
- from("netty:" + protocol + ":" + host + ":" + port + "?sync=false&decoders=#decoder").unmarshal(new SyslogDataFormat()).to("seda:syslog");
+ fromF("netty:%s://%s:%d?sync=false&decoders=#decoder", protocol, host, port)
+ .unmarshal(new SyslogDataFormat()).to("seda:syslog");
}
});
- CAMEL_CONTEXT.start();
- }
-
- @Override
- public void afterAll(ExtensionContext context) {
- CAMEL_CONTEXT.stop();
- }
-
- public Exchange getFirstExchangeToBeReceived() {
- return CAMEL_CONTEXT.createConsumerTemplate().receive("seda:syslog", 10000L);
}
}
diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SourceRouteConfigurator.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SourceRouteConfigurator.java
new file mode 100644
index 0000000..a088940
--- /dev/null
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SourceRouteConfigurator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.syslog.services;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.syslog.SyslogDataFormat;
+import org.apache.camel.component.syslog.netty.Rfc5425Encoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SourceRouteConfigurator implements RouteConfigurator {
+ private static final Logger LOG = LoggerFactory.getLogger(SourceRouteConfigurator.class);
+
+ private final String protocol;
+ private final String host;
+ private final int port;
+
+ public SourceRouteConfigurator(String protocol, String host, int port) {
+ this.protocol = protocol;
+ this.host = host;
+ this.port = port;
+ }
+
+ @Override
+ public void configure(CamelContext camelContext) throws Exception {
+ camelContext.getRegistry().bind("encoder", new Rfc5425Encoder());
+
+ LOG.debug("Adding routes");
+ camelContext.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:test")
+ .marshal(new SyslogDataFormat())
+ .toF("netty:%s://%s:%d?sync=false&encoders=#encoder&useByteBuf=true", protocol, host, port);
+ }
+ });
+ }
+}
diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SyslogService.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SyslogService.java
index 3bc07b3..53195ca 100644
--- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SyslogService.java
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SyslogService.java
@@ -18,45 +18,52 @@ package org.apache.camel.kafkaconnector.syslog.services;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.syslog.SyslogDataFormat;
-import org.apache.camel.component.syslog.netty.Rfc5425FrameDecoder;
import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.test.infra.common.TestUtils;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
public class SyslogService implements BeforeAllCallback, AfterAllCallback {
- private static final CamelContext CAMEL_CONTEXT = new DefaultCamelContext();
+ private final CamelContext camelContext = new DefaultCamelContext();
- private static String protocol;
- private static String host;
- private static int port;
+ private final RouteConfigurator routeConfigurator;
- public SyslogService(String protocol, String host, int port) {
- this.protocol = protocol;
- this.host = host;
- this.port = port;
+ public SyslogService(RouteConfigurator routeConfigurator) {
+ this.routeConfigurator = routeConfigurator;
}
@Override
public void beforeAll(ExtensionContext context) throws Exception {
- CAMEL_CONTEXT.getRegistry().bind("decoder", new Rfc5425FrameDecoder());
- CAMEL_CONTEXT.addRoutes(new RouteBuilder() {
- @Override
- public void configure() {
- from("netty:" + protocol + ":" + host + ":" + port + "?sync=false&decoders=#decoder").unmarshal(new SyslogDataFormat()).to("seda:syslog");
- }
- });
- CAMEL_CONTEXT.start();
+ routeConfigurator.configure(camelContext);
+
+ camelContext.start();
+ TestUtils.waitFor(camelContext::isStarted);
}
@Override
public void afterAll(ExtensionContext context) {
- CAMEL_CONTEXT.stop();
+ camelContext.stop();
+ TestUtils.waitFor(camelContext::isStopped);
+ }
+
+ public CamelContext getCamelContext() {
+ return camelContext;
}
public Exchange getFirstExchangeToBeReceived() {
- return CAMEL_CONTEXT.createConsumerTemplate().receive("seda:syslog", 10000L);
+ return camelContext.createConsumerTemplate().receive("seda:syslog", 10000L);
+ }
+
+ public static SyslogService sinkSyslogServiceFactory(String protocol, String host, int port) {
+ SinkRouteConfigurator sinkRouteConfigurator = new SinkRouteConfigurator(protocol, host, port);
+
+ return new SyslogService(sinkRouteConfigurator);
+ }
+
+ public static SyslogService sourceSyslogServiceFactory(String protocol, String host, int port) {
+ SourceRouteConfigurator sourceRouteConfigurator = new SourceRouteConfigurator(protocol, host, port);
+
+ return new SyslogService(sourceRouteConfigurator);
}
}
diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
index 78eb2f4..a2620dc 100644
--- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
@@ -19,6 +19,8 @@ package org.apache.camel.kafkaconnector.syslog.sink;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
@@ -31,6 +33,7 @@ import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
@@ -40,11 +43,13 @@ import static org.junit.jupiter.api.Assertions.fail;
*/
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CamelSinkSyslogITCase extends CamelSinkTestSupport {
- private static final int FREE_PORT = NetworkUtils.getFreePort("localhost", NetworkUtils.Protocol.UDP);
+ private static final String HOST = "localhost";
+ private static final String PROTOCOL = "udp";
+ private static final int FREE_PORT = NetworkUtils.getFreePort(HOST, NetworkUtils.Protocol.UDP);
private static final String TEST_TXT = "<13>1 2020-05-14T14:47:01.198+02:00 nathannever myapp - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!";
@RegisterExtension
- public static SyslogService syslogService = new SyslogService("udp", "//localhost", FREE_PORT);
+ public static SyslogService service = SyslogService.sinkSyslogServiceFactory(PROTOCOL, HOST, FREE_PORT);
private String topicName;
private final int expect = 1;
@@ -79,7 +84,15 @@ public class CamelSinkSyslogITCase extends CamelSinkTestSupport {
@Override
protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
if (latch.await(30, TimeUnit.SECONDS)) {
- assertEquals(TEST_TXT, syslogService.getFirstExchangeToBeReceived().getIn().getBody(String.class));
+ Exchange exchange = service.getFirstExchangeToBeReceived();
+ assertNotNull(exchange, "There should have been an exchange received");
+ Message message = exchange.getIn();
+ assertNotNull(message, "There should have been a message in the exchange");
+
+ String body = message.getBody(String.class);
+ assertNotNull(body, "The message body should not be null");
+ assertEquals(TEST_TXT, message.getBody(String.class),
+ "The received message body does not match the expected message");
} else {
fail("Timed out wait for data to be added to the Kafka cluster");
}
diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java
index 7f14a2e..3a14ac2 100644
--- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java
@@ -19,26 +19,18 @@ package org.apache.camel.kafkaconnector.syslog.source;
import java.util.concurrent.ExecutionException;
-import org.apache.camel.CamelContext;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.syslog.SyslogDataFormat;
-import org.apache.camel.component.syslog.netty.Rfc5425Encoder;
-import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
import org.apache.camel.kafkaconnector.common.test.StringMessageConsumer;
import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeAll;
+import org.apache.camel.kafkaconnector.syslog.services.SyslogService;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.junit.jupiter.api.extension.RegisterExtension;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -49,57 +41,32 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
*/
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CamelSourceSyslogITCase extends CamelSourceTestSupport {
- private static final Logger LOG = LoggerFactory.getLogger(CamelSourceSyslogITCase.class);
private static final String HOST = "localhost";
private static final String PROTOCOL = "udp";
private static final int FREE_PORT = NetworkUtils.getFreePort(HOST, NetworkUtils.Protocol.UDP);
+ @RegisterExtension
+ public static SyslogService service = SyslogService.sourceSyslogServiceFactory(PROTOCOL, "localhost", FREE_PORT);
+
private final int expect = 1;
- private ConnectorPropertyFactory connectorPropertyFactory;
private String topicName;
- private CamelContext camelContext;
-
@Override
protected String[] getConnectorsInTest() {
return new String[] {"camel-syslog-kafka-connector"};
}
- @BeforeAll
- public void setupCamelContext() throws Exception {
- LOG.debug("Creating the Camel context");
- camelContext = new DefaultCamelContext();
- camelContext.getRegistry().bind("encoder", new Rfc5425Encoder());
-
- LOG.debug("Adding routes");
- camelContext.addRoutes(new RouteBuilder() {
- @Override
- public void configure() {
- from("direct:test")
- .marshal(new SyslogDataFormat())
- .toF("netty:%s://%s:%d?sync=false&encoders=#encoder&useByteBuf=true", PROTOCOL, HOST, FREE_PORT);
- }
- });
- }
-
@BeforeEach
public void setUp() {
topicName = getTopicForTest(this);
-
- camelContext.start();
- TestUtils.waitFor(camelContext::isStarted);
}
- @AfterEach
- public void tearDown() {
- camelContext.stop();
- }
@Override
protected void produceTestData() {
String message = "<13>1 2020-05-14T14:47:01.198+02:00 nathannever myapp - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!";
- camelContext.createProducerTemplate().sendBody("direct:test", message);
+ service.getCamelContext().createProducerTemplate().sendBody("direct:test", message);
}
@Override
@@ -113,7 +80,7 @@ public class CamelSourceSyslogITCase extends CamelSourceTestSupport {
@RepeatedTest(3)
@Timeout(90)
public void testBasicSend() throws ExecutionException, InterruptedException {
- connectorPropertyFactory = CamelSyslogPropertyFactory
+ ConnectorPropertyFactory connectorPropertyFactory = CamelSyslogPropertyFactory
.basic()
.withKafkaTopic(topicName)
.withHost(HOST)