You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/22 15:10:14 UTC

[camel-kafka-connector] branch camel-master updated: Convert the CXF source test case to use the base source test class

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/camel-master by this push:
     new da58b33  Convert the CXF source test case to use the base source test class
da58b33 is described below

commit da58b33f4b3dd4345f074999e6885c39bd92a6e4
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Feb 22 15:28:52 2021 +0100

    Convert the CXF source test case to use the base source test class
---
 .../common/test/CamelSourceTestSupport.java        |  14 +++
 .../cxf/source/CamelSourceCXFITCase.java           | 125 ++++++++++-----------
 2 files changed, 71 insertions(+), 68 deletions(-)

diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
index 88f278d..5bb6a93 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
@@ -120,5 +120,19 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest {
         LOG.debug("Verified messages");
     }
 
+    /**
+     * A simple test runner that follows the steps: initialize, start producer, consume messages, verify results
+     *
+     * @param connectorPropertyFactory A factory for connector properties
+     * @param topic the topic to send the messages to
+     * @param count the number of messages to send
+     * @throws Exception For test-specific exceptions
+     */
+    public void runTestBlocking(ConnectorPropertyFactory connectorPropertyFactory, String topic, int count) throws ExecutionException, InterruptedException {
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+        StringMessageConsumer consumer = new StringMessageConsumer(kafkaClient, topic, count);
+
+        runTestBlocking(connectorPropertyFactory, consumer);
+    }
 
 }
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java
index a4327ed..b9f04c2 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java
@@ -19,27 +19,26 @@ package org.apache.camel.kafkaconnector.cxf.source;
 
 import java.util.concurrent.ExecutionException;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.kafkaconnector.cxf.client.CXFServiceUtil;
 import org.apache.camel.kafkaconnector.cxf.common.HelloService;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 /**
  * A simple test case that checks whether the CXF Consumer Endpoint produces the expected number of messages
  */
-public class CamelSourceCXFITCase extends AbstractKafkaTest {
+public class CamelSourceCXFITCase extends CamelSourceTestSupport {
 
     protected static final int PORT = NetworkUtils.getFreePort("localhost");
     protected static final String SIMPLE_ENDPOINT_ADDRESS = "http://localhost:" + PORT + "/CxfConsumerTest/test";
@@ -47,101 +46,91 @@ public class CamelSourceCXFITCase extends AbstractKafkaTest {
             + "?serviceClass=org.apache.camel.kafkaconnector.cxf.common.HelloService"
             + "&publishedEndpointUrl=http://www.simple.com/services/test";
 
-    private static final String TEST_MESSAGE = "Hello World!";
-
     private static final Logger LOG = LoggerFactory.getLogger(CamelSourceCXFITCase.class);
 
-    private int received;
-    private final int expect = 1;
+    private final int expect = 10;
 
     @Override
     protected String[] getConnectorsInTest() {
         return new String[] {"camel-cxf-kafka-connector"};
     }
 
-    @BeforeEach
-    public void setUp() {
-        received = 0;
-    }
-
-    private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
-        LOG.debug("Received: {}", record.value());
 
-        received++;
+    @Override
+    protected void produceTestData() {
+        TestUtils.waitFor(() -> NetworkUtils.portIsOpen("localhost", PORT));
 
-        if (received == expect) {
-            return false;
-        }
+        try {
+            HelloService client = CXFServiceUtil.getService(SIMPLE_ENDPOINT_ADDRESS, HelloService.class);
 
-        return true;
-    }
+            for (int i = 0; i < expect; i++) {
+                client.echo("Test message " + i);
+            }
 
-    public void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory)
-            throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
 
-        // ensure cxf source connector is up
-        Thread.sleep(5000);
-        HelloService client = CXFServiceUtil.getService(SIMPLE_ENDPOINT_ADDRESS, HelloService.class);
-        try {
-            String result = client.echo(TEST_MESSAGE);
-            assertEquals(result, TEST_MESSAGE);
         } catch (Exception e) {
-            LOG.info("Test Invocation Failure", e);
+            LOG.info("Unable to invoke service: {}", e.getMessage(), e);
+            fail("Unable to invoke service");
         }
+    }
+
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        LOG.info("Consumed messages: {}", consumer.consumedMessages());
 
-        LOG.debug("Creating the consumer ...");
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
-        LOG.debug("Created the consumer ...");
+        for (ConsumerRecord<String, ?> record : consumer.consumedMessages()) {
+            Object receivedObject = consumer.consumedMessages().get(0).value();
+            if (!(receivedObject instanceof String)) {
+                fail("Unexpected message type");
+            }
 
-        assertEquals(received, expect, "Didn't process the expected amount of messages");
+            String result = (String) receivedObject;
+            assertTrue(result.contains("Test message"));
+        }
     }
 
+
     @Test
     @Timeout(20)
-    public void testBasicSendReceive() {
-        try {
-            ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory.basic()
-                    .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())).withAddress(SIMPLE_ENDPOINT_ADDRESS)
-                    .withServiceClass("org.apache.camel.kafkaconnector.cxf.common.HelloService");
+    public void testBasicSendReceive() throws ExecutionException, InterruptedException {
+        String topicName = getTopicForTest(this);
 
-            runBasicStringTest(connectorPropertyFactory);
-        } catch (Exception e) {
-            LOG.error("CXF test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+        ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory
+                .basic()
+                .withKafkaTopic(topicName)
+                .withAddress(SIMPLE_ENDPOINT_ADDRESS)
+                .withServiceClass("org.apache.camel.kafkaconnector.cxf.common.HelloService");
+
+        runTestBlocking(connectorPropertyFactory, topicName, expect);
     }
 
     @Test
     @Timeout(20)
-    public void testBasicSendReceiveUsingUrl() {
-        try {
-            ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory.basic()
-                    .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())).withUrl(SIMPLE_ENDPOINT_URI)
-                    .buildUrl();
+    public void testBasicSendReceiveUsingUrl() throws ExecutionException, InterruptedException {
+        String topicName = getTopicForTest(this);
 
-            runBasicStringTest(connectorPropertyFactory);
-        } catch (Exception e) {
-            LOG.error("CXF test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+        ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory
+                .basic()
+                .withKafkaTopic(topicName)
+                .withUrl(SIMPLE_ENDPOINT_URI)
+                .buildUrl();
+
+        runTestBlocking(connectorPropertyFactory, topicName, expect);
     }
 
     @Test
     @Timeout(20)
-    public void testBasicSendReceiveUsingDataFormat() {
-        try {
-            ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory.basic()
-                    .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())).withAddress(SIMPLE_ENDPOINT_ADDRESS)
-                    .withServiceClass("org.apache.camel.kafkaconnector.cxf.common.HelloService").withDataFormat("POJO");
+    public void testBasicSendReceiveUsingDataFormat() throws ExecutionException, InterruptedException {
+        String topicName = getTopicForTest(this);
 
-            runBasicStringTest(connectorPropertyFactory);
-        } catch (Exception e) {
-            LOG.error("CXF test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+        ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory
+                .basic()
+                .withKafkaTopic(topicName)
+                .withAddress(SIMPLE_ENDPOINT_ADDRESS)
+                .withServiceClass("org.apache.camel.kafkaconnector.cxf.common.HelloService")
+                .withDataFormat("POJO");
+
+        runTestBlocking(connectorPropertyFactory, topicName, expect);
     }
 
 }