You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/15 10:39:05 UTC

[camel-kafka-connector] 04/10: Rework the syslog integration tests to let JUnit handle camel's lifecycle in the test

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit dd5ce04329c4601ae32027971335d0f051c27ed3
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Fri Feb 12 16:11:18 2021 +0100

    Rework the syslog integration tests to let JUnit handle camel's lifecycle in the test
    
    This fixes a few false negatives related to start/stop the CamelContext in the test
---
 .../syslog/services/RouteConfigurator.java         | 24 ++++++++++
 ...slogService.java => SinkRouteConfigurator.java} | 41 +++++++---------
 .../syslog/services/SourceRouteConfigurator.java   | 54 ++++++++++++++++++++++
 .../syslog/services/SyslogService.java             | 49 +++++++++++---------
 .../syslog/sink/CamelSinkSyslogITCase.java         | 19 ++++++--
 .../syslog/source/CamelSourceSyslogITCase.java     | 47 +++----------------
 6 files changed, 145 insertions(+), 89 deletions(-)

diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/RouteConfigurator.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/RouteConfigurator.java
new file mode 100644
index 0000000..4b20f72
--- /dev/null
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/RouteConfigurator.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.syslog.services;
+
+import org.apache.camel.CamelContext;
+
+public interface RouteConfigurator {
+    void configure(CamelContext camelContext) throws Exception;
+}
diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SyslogService.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SinkRouteConfigurator.java
similarity index 51%
copy from tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SyslogService.java
copy to tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SinkRouteConfigurator.java
index 3bc07b3..83f1a30 100644
--- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SyslogService.java
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SinkRouteConfigurator.java
@@ -14,49 +14,40 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.camel.kafkaconnector.syslog.services;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.syslog.SyslogDataFormat;
 import org.apache.camel.component.syslog.netty.Rfc5425FrameDecoder;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.junit.jupiter.api.extension.AfterAllCallback;
-import org.junit.jupiter.api.extension.BeforeAllCallback;
-import org.junit.jupiter.api.extension.ExtensionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class SyslogService implements BeforeAllCallback, AfterAllCallback {
-    private static final CamelContext CAMEL_CONTEXT = new DefaultCamelContext();
+public class SinkRouteConfigurator implements RouteConfigurator {
+    private static final Logger LOG = LoggerFactory.getLogger(SinkRouteConfigurator.class);
 
-    private static String protocol;
-    private static String host;
-    private static int port;
+    private final String protocol;
+    private final String host;
+    private final int port;
 
-    public SyslogService(String protocol, String host, int port) {
+    public SinkRouteConfigurator(String protocol, String host, int port) {
         this.protocol = protocol;
         this.host = host;
         this.port = port;
     }
 
     @Override
-    public void beforeAll(ExtensionContext context) throws Exception {
-        CAMEL_CONTEXT.getRegistry().bind("decoder", new Rfc5425FrameDecoder());
-        CAMEL_CONTEXT.addRoutes(new RouteBuilder() {
+    public void configure(CamelContext camelContext) throws Exception {
+        camelContext.getRegistry().bind("decoder", new Rfc5425FrameDecoder());
+
+        LOG.debug("Adding routes");
+        camelContext.addRoutes(new RouteBuilder() {
             @Override
             public void configure() {
-                from("netty:" + protocol + ":" + host + ":" + port + "?sync=false&decoders=#decoder").unmarshal(new SyslogDataFormat()).to("seda:syslog");
+                fromF("netty:%s://%s:%d?sync=false&decoders=#decoder", protocol, host, port)
+                        .unmarshal(new SyslogDataFormat()).to("seda:syslog");
             }
         });
-        CAMEL_CONTEXT.start();
-    }
-
-    @Override
-    public void afterAll(ExtensionContext context) {
-        CAMEL_CONTEXT.stop();
-    }
-
-    public Exchange getFirstExchangeToBeReceived() {
-        return CAMEL_CONTEXT.createConsumerTemplate().receive("seda:syslog", 10000L);
     }
 }
diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SourceRouteConfigurator.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SourceRouteConfigurator.java
new file mode 100644
index 0000000..a088940
--- /dev/null
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SourceRouteConfigurator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.syslog.services;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.syslog.SyslogDataFormat;
+import org.apache.camel.component.syslog.netty.Rfc5425Encoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SourceRouteConfigurator implements RouteConfigurator {
+    private static final Logger LOG = LoggerFactory.getLogger(SourceRouteConfigurator.class);
+
+    private final String protocol;
+    private final String host;
+    private final int port;
+
+    public SourceRouteConfigurator(String protocol, String host, int port) {
+        this.protocol = protocol;
+        this.host = host;
+        this.port = port;
+    }
+
+    @Override
+    public void configure(CamelContext camelContext) throws Exception {
+        camelContext.getRegistry().bind("encoder", new Rfc5425Encoder());
+
+        LOG.debug("Adding routes");
+        camelContext.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:test")
+                        .marshal(new SyslogDataFormat())
+                        .toF("netty:%s://%s:%d?sync=false&encoders=#encoder&useByteBuf=true", protocol, host, port);
+            }
+        });
+    }
+}
diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SyslogService.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SyslogService.java
index 3bc07b3..53195ca 100644
--- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SyslogService.java
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SyslogService.java
@@ -18,45 +18,52 @@ package org.apache.camel.kafkaconnector.syslog.services;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.syslog.SyslogDataFormat;
-import org.apache.camel.component.syslog.netty.Rfc5425FrameDecoder;
 import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.test.infra.common.TestUtils;
 import org.junit.jupiter.api.extension.AfterAllCallback;
 import org.junit.jupiter.api.extension.BeforeAllCallback;
 import org.junit.jupiter.api.extension.ExtensionContext;
 
 public class SyslogService implements BeforeAllCallback, AfterAllCallback {
-    private static final CamelContext CAMEL_CONTEXT = new DefaultCamelContext();
+    private final CamelContext camelContext = new DefaultCamelContext();
 
-    private static String protocol;
-    private static String host;
-    private static int port;
+    private final RouteConfigurator routeConfigurator;
 
-    public SyslogService(String protocol, String host, int port) {
-        this.protocol = protocol;
-        this.host = host;
-        this.port = port;
+    public SyslogService(RouteConfigurator routeConfigurator) {
+        this.routeConfigurator = routeConfigurator;
     }
 
     @Override
     public void beforeAll(ExtensionContext context) throws Exception {
-        CAMEL_CONTEXT.getRegistry().bind("decoder", new Rfc5425FrameDecoder());
-        CAMEL_CONTEXT.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() {
-                from("netty:" + protocol + ":" + host + ":" + port + "?sync=false&decoders=#decoder").unmarshal(new SyslogDataFormat()).to("seda:syslog");
-            }
-        });
-        CAMEL_CONTEXT.start();
+        routeConfigurator.configure(camelContext);
+
+        camelContext.start();
+        TestUtils.waitFor(camelContext::isStarted);
     }
 
     @Override
     public void afterAll(ExtensionContext context) {
-        CAMEL_CONTEXT.stop();
+        camelContext.stop();
+        TestUtils.waitFor(camelContext::isStopped);
+    }
+
+    public CamelContext getCamelContext() {
+        return camelContext;
     }
 
     public Exchange getFirstExchangeToBeReceived() {
-        return CAMEL_CONTEXT.createConsumerTemplate().receive("seda:syslog", 10000L);
+        return camelContext.createConsumerTemplate().receive("seda:syslog", 10000L);
+    }
+
+    public static SyslogService sinkSyslogServiceFactory(String protocol, String host, int port) {
+        SinkRouteConfigurator sinkRouteConfigurator = new SinkRouteConfigurator(protocol, host, port);
+
+        return new SyslogService(sinkRouteConfigurator);
+    }
+
+    public static SyslogService sourceSyslogServiceFactory(String protocol, String host, int port) {
+        SourceRouteConfigurator sourceRouteConfigurator = new SourceRouteConfigurator(protocol, host, port);
+
+        return new SyslogService(sourceRouteConfigurator);
     }
 }
diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
index 78eb2f4..a2620dc 100644
--- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
@@ -19,6 +19,8 @@ package org.apache.camel.kafkaconnector.syslog.sink;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
@@ -31,6 +33,7 @@ import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.fail;
 
 
@@ -40,11 +43,13 @@ import static org.junit.jupiter.api.Assertions.fail;
  */
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CamelSinkSyslogITCase extends CamelSinkTestSupport {
-    private static final int FREE_PORT = NetworkUtils.getFreePort("localhost", NetworkUtils.Protocol.UDP);
+    private static final String HOST = "localhost";
+    private static final String PROTOCOL = "udp";
+    private static final int FREE_PORT = NetworkUtils.getFreePort(HOST, NetworkUtils.Protocol.UDP);
     private static final String TEST_TXT = "<13>1 2020-05-14T14:47:01.198+02:00 nathannever myapp - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!";
 
     @RegisterExtension
-    public static SyslogService syslogService = new SyslogService("udp", "//localhost", FREE_PORT);
+    public static SyslogService service = SyslogService.sinkSyslogServiceFactory(PROTOCOL, HOST, FREE_PORT);
 
     private String topicName;
     private final int expect = 1;
@@ -79,7 +84,15 @@ public class CamelSinkSyslogITCase extends CamelSinkTestSupport {
     @Override
     protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
         if (latch.await(30, TimeUnit.SECONDS)) {
-            assertEquals(TEST_TXT, syslogService.getFirstExchangeToBeReceived().getIn().getBody(String.class));
+            Exchange exchange = service.getFirstExchangeToBeReceived();
+            assertNotNull(exchange, "There should have been an exchange received");
+            Message message = exchange.getIn();
+            assertNotNull(message, "There should have been a message in the exchange");
+
+            String body = message.getBody(String.class);
+            assertNotNull(body, "The message body should not be null");
+            assertEquals(TEST_TXT, message.getBody(String.class),
+                    "The received message body does not match the expected message");
         } else {
             fail("Timed out wait for data to be added to the Kafka cluster");
         }
diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java
index 7f14a2e..3a14ac2 100644
--- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java
@@ -19,26 +19,18 @@ package org.apache.camel.kafkaconnector.syslog.source;
 
 import java.util.concurrent.ExecutionException;
 
-import org.apache.camel.CamelContext;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.syslog.SyslogDataFormat;
-import org.apache.camel.component.syslog.netty.Rfc5425Encoder;
-import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
 import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
 import org.apache.camel.kafkaconnector.common.test.StringMessageConsumer;
 import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeAll;
+import org.apache.camel.kafkaconnector.syslog.services.SyslogService;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -49,57 +41,32 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
  */
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CamelSourceSyslogITCase extends CamelSourceTestSupport {
-    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceSyslogITCase.class);
     private static final String HOST = "localhost";
     private static final String PROTOCOL = "udp";
     private static final int FREE_PORT = NetworkUtils.getFreePort(HOST, NetworkUtils.Protocol.UDP);
 
+    @RegisterExtension
+    public static SyslogService service = SyslogService.sourceSyslogServiceFactory(PROTOCOL, "localhost", FREE_PORT);
+
     private final int expect = 1;
-    private ConnectorPropertyFactory connectorPropertyFactory;
     private String topicName;
 
-    private CamelContext camelContext;
-
     @Override
     protected String[] getConnectorsInTest() {
         return new String[] {"camel-syslog-kafka-connector"};
     }
 
-    @BeforeAll
-    public void setupCamelContext() throws Exception {
-        LOG.debug("Creating the Camel context");
-        camelContext = new DefaultCamelContext();
-        camelContext.getRegistry().bind("encoder", new Rfc5425Encoder());
-
-        LOG.debug("Adding routes");
-        camelContext.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() {
-                from("direct:test")
-                        .marshal(new SyslogDataFormat())
-                        .toF("netty:%s://%s:%d?sync=false&encoders=#encoder&useByteBuf=true", PROTOCOL, HOST, FREE_PORT);
-            }
-        });
-    }
-
     @BeforeEach
     public void setUp() {
         topicName = getTopicForTest(this);
-
-        camelContext.start();
-        TestUtils.waitFor(camelContext::isStarted);
     }
 
-    @AfterEach
-    public void tearDown() {
-        camelContext.stop();
-    }
 
     @Override
     protected void produceTestData() {
         String message = "<13>1 2020-05-14T14:47:01.198+02:00 nathannever myapp - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!";
 
-        camelContext.createProducerTemplate().sendBody("direct:test", message);
+        service.getCamelContext().createProducerTemplate().sendBody("direct:test", message);
     }
 
     @Override
@@ -113,7 +80,7 @@ public class CamelSourceSyslogITCase extends CamelSourceTestSupport {
     @RepeatedTest(3)
     @Timeout(90)
     public void testBasicSend() throws ExecutionException, InterruptedException {
-        connectorPropertyFactory = CamelSyslogPropertyFactory
+        ConnectorPropertyFactory connectorPropertyFactory = CamelSyslogPropertyFactory
                 .basic()
                 .withKafkaTopic(topicName)
                 .withHost(HOST)