You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/03/02 16:53:58 UTC
[camel-kafka-connector] 04/05: Convert the CXF sink test case to
use the base Sink test class
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 633931b5f5494bae20830de5daa5a83f38409c40
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Fri Feb 19 14:05:29 2021 +0100
Convert the CXF sink test case to use the base Sink test class
- Move common test code to a common package
- Remove exception handling code
- Move the request to separate files
---
tests/itests-cxf/pom.xml | 5 +
.../cxf/{source => common}/HelloService.java | 2 +-
.../cxf/sink/CamelSinkCXFITCase.java | 122 ++++++++++-----------
.../kafkaconnector/cxf/sink/HelloServiceImpl.java | 2 +-
.../sink/SinkServerFactoryBeanConfigurator.java | 2 +-
.../cxf/source/CamelSourceCXFITCase.java | 7 +-
.../src/test/resources/hello-service-test.xml | 26 +++++
tests/itests-cxf/src/test/resources/jaxws-test.xml | 26 +++++
8 files changed, 121 insertions(+), 71 deletions(-)
diff --git a/tests/itests-cxf/pom.xml b/tests/itests-cxf/pom.xml
index fc353ea..1882492 100644
--- a/tests/itests-cxf/pom.xml
+++ b/tests/itests-cxf/pom.xml
@@ -94,6 +94,11 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+
</dependencies>
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/HelloService.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/common/HelloService.java
similarity index 95%
rename from tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/HelloService.java
rename to tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/common/HelloService.java
index 5c4653f..90429a6 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/HelloService.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/common/HelloService.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.camel.kafkaconnector.cxf.source;
+package org.apache.camel.kafkaconnector.cxf.common;
import java.util.List;
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java
index 68bb9e7..6690341 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java
@@ -17,17 +17,18 @@
package org.apache.camel.kafkaconnector.cxf.sink;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeoutException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.cxf.services.CXFEmbeddedServerService;
import org.apache.camel.kafkaconnector.cxf.services.CXFService;
import org.apache.camel.kafkaconnector.cxf.services.JaxWsServiceConfigurator;
+import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -39,27 +40,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
-public class CamelSinkCXFITCase extends AbstractKafkaTest {
- protected static final String TEST_MESSAGE = "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">"
- + "<soap:Body><ns1:echo xmlns:ns1=\"http://source.cxf.kafkaconnector.camel.apache.org/\">"
- + "<arg0 xmlns=\"http://source.cxf.kafkaconnector.camel.apache.org/\">hello world</arg0>"
- + "</ns1:echo></soap:Body></soap:Envelope>";
-
- protected static final String JAXWS_TEST_MESSAGE = "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">\"\n"
- + " + \"<soap:Body><ns1:greetMe xmlns:ns1=\"http://apache.org/hello_world_soap_http/types\">\"\n"
- + " + \"<requestType xmlns=\"http://apache.org/hello_world_soap_http/types\">hello world!</requestType>\"\n"
- + " + \"</ns1:greetMe></soap:Body></soap:Envelope>";
-
+public class CamelSinkCXFITCase extends CamelSinkTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(CamelSinkCXFITCase.class);
-
+
private SinkServerFactoryBeanConfigurator serverFactoryBeanConfigurator = new SinkServerFactoryBeanConfigurator();
private JaxWsServiceConfigurator jaxWsServiceConfigurator = new SinkJaxWsServiceConfigurator();
@RegisterExtension
public CXFService service = new CXFEmbeddedServerService(serverFactoryBeanConfigurator, jaxWsServiceConfigurator);
-
-
private final int expect = 10;
private String topicName;
@@ -74,17 +63,35 @@ public class CamelSinkCXFITCase extends AbstractKafkaTest {
GreeterImpl.outputFile().delete();
}
+ @Override
+ protected void consumeMessages(CountDownLatch latch) {
+ try {
+ // NO-OP (the messages are consumed on each service implementation)
+ Thread.sleep(5000);
+ } catch (Exception e) {
+ LOG.warn("Interrupted");
+ } finally {
+ latch.countDown();
+ }
+ }
+
+ @Override
+ protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+ // NO-OP (specific for each)
+ if (!latch.await(30, TimeUnit.SECONDS)) {
+ fail("Failed to receive the messages within the specified time: received %d of %d");
+ }
+ }
+
private void putRecords(String message, int count) {
KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
- for (int i = 0; i < count; i++) {
- try {
+ try {
+ for (int i = 0; i < count; i++) {
kafkaClient.produce(topicName, message);
- } catch (ExecutionException e) {
- LOG.error("Unable to produce messages: {}", e.getMessage(), e);
- } catch (InterruptedException e) {
- break;
}
+ } catch (Exception e) {
+ fail(e.getMessage());
}
}
@@ -92,54 +99,39 @@ public class CamelSinkCXFITCase extends AbstractKafkaTest {
return service.getJaxWsServerAddress() + "?serviceClass=org.apache.hello_world_soap_http.Greeter";
}
- public void runTest(ConnectorPropertyFactory connectorPropertyFactory, String message, int count)
- throws ExecutionException, InterruptedException, TimeoutException {
- connectorPropertyFactory.log();
- getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
- ExecutorService service = Executors.newCachedThreadPool();
- Runnable r = () -> this.putRecords(message, count);
- service.submit(r);
- Thread.sleep(5000);
- LOG.debug("Created the consumer ... About to receive messages");
- }
-
@Test
- public void testBasicSendReceiveUsingUrl() {
- try {
- ConnectorPropertyFactory connectorPropertyFactory = CamelSinkCXFPropertyFactory
- .basic()
- .withName("CamelCXFSinkConnector")
- .withTopics(topicName)
- .withAddress(service.getSimpleServerAddress())
- .withServiceClass("org.apache.camel.kafkaconnector.cxf.source.HelloService")
- .withDataFormat("RAW");
+ public void testBasicSendReceiveUsingUrl() throws Exception {
+ InputStream stream = this.getClass().getResource("/hello-service-test.xml").openStream();
+ String testMessage = IOUtils.toString(stream, Charset.defaultCharset());
- runTest(connectorPropertyFactory, TEST_MESSAGE, expect);
+ ConnectorPropertyFactory connectorPropertyFactory = CamelSinkCXFPropertyFactory
+ .basic()
+ .withName("CamelCXFSinkConnector")
+ .withTopics(topicName)
+ .withAddress(service.getSimpleServerAddress())
+ .withServiceClass("org.apache.camel.kafkaconnector.cxf.common.HelloService")
+ .withDataFormat("RAW");
- assertEquals(expect, serverFactoryBeanConfigurator.getInvocationCount());
- } catch (Exception e) {
- LOG.error("CXF Sink test failed: {} {}", e.getMessage(), e);
- fail(e.getMessage(), e);
- }
+ runTest(connectorPropertyFactory, () -> putRecords(testMessage, expect));
+
+ assertEquals(expect, serverFactoryBeanConfigurator.getInvocationCount());
}
@Test
@Timeout(90)
- public void testJaxWsBasicSendReceiveUsingUrl() {
- try {
- ConnectorPropertyFactory connectorPropertyFactory = CamelSinkCXFPropertyFactory
- .basic()
- .withName("CamelCXFSinkConnectorUrl")
- .withTopics(topicName)
- .withAddress(getJaxwsEndpointUri())
- .withDataFormat("RAW");
+ public void testJaxWsBasicSendReceiveUsingUrl() throws Exception {
+ InputStream stream = this.getClass().getResource("/jaxws-test.xml").openStream();
+ String testMessage = IOUtils.toString(stream, Charset.defaultCharset());
- runTest(connectorPropertyFactory, JAXWS_TEST_MESSAGE, 1);
+ ConnectorPropertyFactory connectorPropertyFactory = CamelSinkCXFPropertyFactory
+ .basic()
+ .withName("CamelCXFSinkConnectorUrl")
+ .withTopics(topicName)
+ .withAddress(getJaxwsEndpointUri())
+ .withDataFormat("RAW");
- assertTrue(GreeterImpl.outputFile().exists(), "The test output file was not created");
- } catch (Exception e) {
- LOG.error("CXF Sink test failed: {} {}", e.getMessage(), e);
- fail(e.getMessage(), e);
- }
+ runTest(connectorPropertyFactory, () -> putRecords(testMessage, 1));
+
+ assertTrue(GreeterImpl.outputFile().exists(), "The test output file was not created");
}
}
\ No newline at end of file
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/HelloServiceImpl.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/HelloServiceImpl.java
index 88f1f12..3aeed6d 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/HelloServiceImpl.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/HelloServiceImpl.java
@@ -18,7 +18,7 @@ package org.apache.camel.kafkaconnector.cxf.sink;
import java.util.List;
-import org.apache.camel.kafkaconnector.cxf.source.HelloService;
+import org.apache.camel.kafkaconnector.cxf.common.HelloService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/SinkServerFactoryBeanConfigurator.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/SinkServerFactoryBeanConfigurator.java
index 28860da..e410d09 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/SinkServerFactoryBeanConfigurator.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/SinkServerFactoryBeanConfigurator.java
@@ -16,8 +16,8 @@
*/
package org.apache.camel.kafkaconnector.cxf.sink;
+import org.apache.camel.kafkaconnector.cxf.common.HelloService;
import org.apache.camel.kafkaconnector.cxf.services.ServerFactoryBeanConfigurator;
-import org.apache.camel.kafkaconnector.cxf.source.HelloService;
import org.apache.cxf.frontend.ServerFactoryBean;
class SinkServerFactoryBeanConfigurator implements ServerFactoryBeanConfigurator {
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java
index 9a75ea4..a4327ed 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java
@@ -25,6 +25,7 @@ import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.kafkaconnector.cxf.client.CXFServiceUtil;
+import org.apache.camel.kafkaconnector.cxf.common.HelloService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -43,7 +44,7 @@ public class CamelSourceCXFITCase extends AbstractKafkaTest {
protected static final int PORT = NetworkUtils.getFreePort("localhost");
protected static final String SIMPLE_ENDPOINT_ADDRESS = "http://localhost:" + PORT + "/CxfConsumerTest/test";
protected static final String SIMPLE_ENDPOINT_URI = SIMPLE_ENDPOINT_ADDRESS
- + "?serviceClass=org.apache.camel.kafkaconnector.cxf.source.HelloService"
+ + "?serviceClass=org.apache.camel.kafkaconnector.cxf.common.HelloService"
+ "&publishedEndpointUrl=http://www.simple.com/services/test";
private static final String TEST_MESSAGE = "Hello World!";
@@ -104,7 +105,7 @@ public class CamelSourceCXFITCase extends AbstractKafkaTest {
try {
ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory.basic()
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())).withAddress(SIMPLE_ENDPOINT_ADDRESS)
- .withServiceClass("org.apache.camel.kafkaconnector.cxf.source.HelloService");
+ .withServiceClass("org.apache.camel.kafkaconnector.cxf.common.HelloService");
runBasicStringTest(connectorPropertyFactory);
} catch (Exception e) {
@@ -134,7 +135,7 @@ public class CamelSourceCXFITCase extends AbstractKafkaTest {
try {
ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory.basic()
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())).withAddress(SIMPLE_ENDPOINT_ADDRESS)
- .withServiceClass("org.apache.camel.kafkaconnector.cxf.source.HelloService").withDataFormat("POJO");
+ .withServiceClass("org.apache.camel.kafkaconnector.cxf.common.HelloService").withDataFormat("POJO");
runBasicStringTest(connectorPropertyFactory);
} catch (Exception e) {
diff --git a/tests/itests-cxf/src/test/resources/hello-service-test.xml b/tests/itests-cxf/src/test/resources/hello-service-test.xml
new file mode 100644
index 0000000..9405a47
--- /dev/null
+++ b/tests/itests-cxf/src/test/resources/hello-service-test.xml
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/">
+ <soap:Body>
+ <ns1:echo xmlns:ns1="http://source.cxf.kafkaconnector.camel.apache.org/">
+ <arg0 xmlns="http://source.cxf.kafkaconnector.camel.apache.org/">hello world</arg0>
+ </ns1:echo>
+ </soap:Body>
+</soap:Envelope>
\ No newline at end of file
diff --git a/tests/itests-cxf/src/test/resources/jaxws-test.xml b/tests/itests-cxf/src/test/resources/jaxws-test.xml
new file mode 100644
index 0000000..02b330c
--- /dev/null
+++ b/tests/itests-cxf/src/test/resources/jaxws-test.xml
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/">
+ <soap:Body>
+ <ns1:greetMe xmlns:ns1="http://apache.org/hello_world_soap_http/types">
+ <requestType xmlns="http://apache.org/hello_world_soap_http/types">hello world!</requestType>
+ </ns1:greetMe>
+ </soap:Body>
+</soap:Envelope>
\ No newline at end of file