You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/22 15:10:14 UTC
[camel-kafka-connector] branch camel-master updated: Convert the
CXF source test case to use the base source test class
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/camel-master by this push:
new da58b33 Convert the CXF source test case to use the base source test class
da58b33 is described below
commit da58b33f4b3dd4345f074999e6885c39bd92a6e4
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Feb 22 15:28:52 2021 +0100
Convert the CXF source test case to use the base source test class
---
.../common/test/CamelSourceTestSupport.java | 14 +++
.../cxf/source/CamelSourceCXFITCase.java | 125 ++++++++++-----------
2 files changed, 71 insertions(+), 68 deletions(-)
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
index 88f278d..5bb6a93 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
@@ -120,5 +120,19 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest {
LOG.debug("Verified messages");
}
+ /**
+ * A simple test runner that follows the steps: initialize, start producer, consume messages, verify results
+ *
+ * @param connectorPropertyFactory A factory for connector properties
+ * @param topic the topic to send the messages to
+ * @param count the number of messages to send
+ * @throws Exception For test-specific exceptions
+ */
+ public void runTestBlocking(ConnectorPropertyFactory connectorPropertyFactory, String topic, int count) throws ExecutionException, InterruptedException {
+ KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+ StringMessageConsumer consumer = new StringMessageConsumer(kafkaClient, topic, count);
+
+ runTestBlocking(connectorPropertyFactory, consumer);
+ }
}
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java
index a4327ed..b9f04c2 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java
@@ -19,27 +19,26 @@ package org.apache.camel.kafkaconnector.cxf.source;
import java.util.concurrent.ExecutionException;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.kafkaconnector.cxf.client.CXFServiceUtil;
import org.apache.camel.kafkaconnector.cxf.common.HelloService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
/**
* A simple test case that checks whether the CXF Consumer Endpoint produces the expected number of messages
*/
-public class CamelSourceCXFITCase extends AbstractKafkaTest {
+public class CamelSourceCXFITCase extends CamelSourceTestSupport {
protected static final int PORT = NetworkUtils.getFreePort("localhost");
protected static final String SIMPLE_ENDPOINT_ADDRESS = "http://localhost:" + PORT + "/CxfConsumerTest/test";
@@ -47,101 +46,91 @@ public class CamelSourceCXFITCase extends AbstractKafkaTest {
+ "?serviceClass=org.apache.camel.kafkaconnector.cxf.common.HelloService"
+ "&publishedEndpointUrl=http://www.simple.com/services/test";
- private static final String TEST_MESSAGE = "Hello World!";
-
private static final Logger LOG = LoggerFactory.getLogger(CamelSourceCXFITCase.class);
- private int received;
- private final int expect = 1;
+ private final int expect = 10;
@Override
protected String[] getConnectorsInTest() {
return new String[] {"camel-cxf-kafka-connector"};
}
- @BeforeEach
- public void setUp() {
- received = 0;
- }
-
- private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
- LOG.debug("Received: {}", record.value());
- received++;
+ @Override
+ protected void produceTestData() {
+ TestUtils.waitFor(() -> NetworkUtils.portIsOpen("localhost", PORT));
- if (received == expect) {
- return false;
- }
+ try {
+ HelloService client = CXFServiceUtil.getService(SIMPLE_ENDPOINT_ADDRESS, HelloService.class);
- return true;
- }
+ for (int i = 0; i < expect; i++) {
+ client.echo("Test message " + i);
+ }
- public void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory)
- throws ExecutionException, InterruptedException {
- connectorPropertyFactory.log();
- getKafkaConnectService().initializeConnector(connectorPropertyFactory);
- // ensure cxf source connector is up
- Thread.sleep(5000);
- HelloService client = CXFServiceUtil.getService(SIMPLE_ENDPOINT_ADDRESS, HelloService.class);
- try {
- String result = client.echo(TEST_MESSAGE);
- assertEquals(result, TEST_MESSAGE);
} catch (Exception e) {
- LOG.info("Test Invocation Failure", e);
+ LOG.info("Unable to invoke service: {}", e.getMessage(), e);
+ fail("Unable to invoke service");
}
+ }
+
+ @Override
+ protected void verifyMessages(TestMessageConsumer<?> consumer) {
+ LOG.info("Consumed messages: {}", consumer.consumedMessages());
- LOG.debug("Creating the consumer ...");
- KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
- kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
- LOG.debug("Created the consumer ...");
+ for (ConsumerRecord<String, ?> record : consumer.consumedMessages()) {
+ Object receivedObject = consumer.consumedMessages().get(0).value();
+ if (!(receivedObject instanceof String)) {
+ fail("Unexpected message type");
+ }
- assertEquals(received, expect, "Didn't process the expected amount of messages");
+ String result = (String) receivedObject;
+ assertTrue(result.contains("Test message"));
+ }
}
+
@Test
@Timeout(20)
- public void testBasicSendReceive() {
- try {
- ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory.basic()
- .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())).withAddress(SIMPLE_ENDPOINT_ADDRESS)
- .withServiceClass("org.apache.camel.kafkaconnector.cxf.common.HelloService");
+ public void testBasicSendReceive() throws ExecutionException, InterruptedException {
+ String topicName = getTopicForTest(this);
- runBasicStringTest(connectorPropertyFactory);
- } catch (Exception e) {
- LOG.error("CXF test failed: {}", e.getMessage(), e);
- fail(e.getMessage());
- }
+ ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory
+ .basic()
+ .withKafkaTopic(topicName)
+ .withAddress(SIMPLE_ENDPOINT_ADDRESS)
+ .withServiceClass("org.apache.camel.kafkaconnector.cxf.common.HelloService");
+
+ runTestBlocking(connectorPropertyFactory, topicName, expect);
}
@Test
@Timeout(20)
- public void testBasicSendReceiveUsingUrl() {
- try {
- ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory.basic()
- .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())).withUrl(SIMPLE_ENDPOINT_URI)
- .buildUrl();
+ public void testBasicSendReceiveUsingUrl() throws ExecutionException, InterruptedException {
+ String topicName = getTopicForTest(this);
- runBasicStringTest(connectorPropertyFactory);
- } catch (Exception e) {
- LOG.error("CXF test failed: {}", e.getMessage(), e);
- fail(e.getMessage());
- }
+ ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory
+ .basic()
+ .withKafkaTopic(topicName)
+ .withUrl(SIMPLE_ENDPOINT_URI)
+ .buildUrl();
+
+ runTestBlocking(connectorPropertyFactory, topicName, expect);
}
@Test
@Timeout(20)
- public void testBasicSendReceiveUsingDataFormat() {
- try {
- ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory.basic()
- .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())).withAddress(SIMPLE_ENDPOINT_ADDRESS)
- .withServiceClass("org.apache.camel.kafkaconnector.cxf.common.HelloService").withDataFormat("POJO");
+ public void testBasicSendReceiveUsingDataFormat() throws ExecutionException, InterruptedException {
+ String topicName = getTopicForTest(this);
- runBasicStringTest(connectorPropertyFactory);
- } catch (Exception e) {
- LOG.error("CXF test failed: {}", e.getMessage(), e);
- fail(e.getMessage());
- }
+ ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory
+ .basic()
+ .withKafkaTopic(topicName)
+ .withAddress(SIMPLE_ENDPOINT_ADDRESS)
+ .withServiceClass("org.apache.camel.kafkaconnector.cxf.common.HelloService")
+ .withDataFormat("POJO");
+
+ runTestBlocking(connectorPropertyFactory, topicName, expect);
}
}