You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/03/02 16:53:58 UTC

[camel-kafka-connector] 04/05: Convert the CXF sink test case to use the base Sink test class

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 633931b5f5494bae20830de5daa5a83f38409c40
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Fri Feb 19 14:05:29 2021 +0100

    Convert the CXF sink test case to use the base Sink test class
    
    - Move common test code to a common package
    - Remove exception handling code
    - Move the request to separate files
---
 tests/itests-cxf/pom.xml                           |   5 +
 .../cxf/{source => common}/HelloService.java       |   2 +-
 .../cxf/sink/CamelSinkCXFITCase.java               | 122 ++++++++++-----------
 .../kafkaconnector/cxf/sink/HelloServiceImpl.java  |   2 +-
 .../sink/SinkServerFactoryBeanConfigurator.java    |   2 +-
 .../cxf/source/CamelSourceCXFITCase.java           |   7 +-
 .../src/test/resources/hello-service-test.xml      |  26 +++++
 tests/itests-cxf/src/test/resources/jaxws-test.xml |  26 +++++
 8 files changed, 121 insertions(+), 71 deletions(-)

diff --git a/tests/itests-cxf/pom.xml b/tests/itests-cxf/pom.xml
index fc353ea..1882492 100644
--- a/tests/itests-cxf/pom.xml
+++ b/tests/itests-cxf/pom.xml
@@ -94,6 +94,11 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+
     </dependencies>
 
 
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/HelloService.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/common/HelloService.java
similarity index 95%
rename from tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/HelloService.java
rename to tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/common/HelloService.java
index 5c4653f..90429a6 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/HelloService.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/common/HelloService.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.camel.kafkaconnector.cxf.source;
+package org.apache.camel.kafkaconnector.cxf.common;
 
 import java.util.List;
 
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java
index 68bb9e7..6690341 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java
@@ -17,17 +17,18 @@
 
 package org.apache.camel.kafkaconnector.cxf.sink;
 
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeoutException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.cxf.services.CXFEmbeddedServerService;
 import org.apache.camel.kafkaconnector.cxf.services.CXFService;
 import org.apache.camel.kafkaconnector.cxf.services.JaxWsServiceConfigurator;
+import org.apache.commons.io.IOUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -39,27 +40,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
-public class CamelSinkCXFITCase extends AbstractKafkaTest {
-    protected static final String TEST_MESSAGE = "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">"
-            + "<soap:Body><ns1:echo xmlns:ns1=\"http://source.cxf.kafkaconnector.camel.apache.org/\">"
-            + "<arg0 xmlns=\"http://source.cxf.kafkaconnector.camel.apache.org/\">hello world</arg0>"
-            + "</ns1:echo></soap:Body></soap:Envelope>";
-
-    protected static final String JAXWS_TEST_MESSAGE = "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">\"\n"
-            + "        + \"<soap:Body><ns1:greetMe xmlns:ns1=\"http://apache.org/hello_world_soap_http/types\">\"\n"
-            + "        + \"<requestType xmlns=\"http://apache.org/hello_world_soap_http/types\">hello world!</requestType>\"\n"
-            + "        + \"</ns1:greetMe></soap:Body></soap:Envelope>";
-
+public class CamelSinkCXFITCase extends CamelSinkTestSupport {
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkCXFITCase.class);
-    
+
     private SinkServerFactoryBeanConfigurator serverFactoryBeanConfigurator = new SinkServerFactoryBeanConfigurator();
     private JaxWsServiceConfigurator jaxWsServiceConfigurator = new SinkJaxWsServiceConfigurator();
 
     @RegisterExtension
     public CXFService service = new CXFEmbeddedServerService(serverFactoryBeanConfigurator, jaxWsServiceConfigurator);
 
-
-
     private final int expect = 10;
     private String topicName;
 
@@ -74,17 +63,35 @@ public class CamelSinkCXFITCase extends AbstractKafkaTest {
         GreeterImpl.outputFile().delete();
     }
 
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try {
+            // NO-OP (the messages are consumed on each service implementation)
+            Thread.sleep(5000);
+        } catch (Exception e) {
+            LOG.warn("Interrupted");
+        } finally {
+            latch.countDown();
+        }
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        // NO-OP (specific for each)
+        if (!latch.await(30, TimeUnit.SECONDS)) {
+            fail("Failed to receive the messages within the specified time: received %d of %d");
+        }
+    }
+
     private void putRecords(String message, int count) {
         KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
 
-        for (int i = 0; i < count; i++) {
-            try {
+        try {
+            for (int i = 0; i < count; i++) {
                 kafkaClient.produce(topicName, message);
-            } catch (ExecutionException e) {
-                LOG.error("Unable to produce messages: {}", e.getMessage(), e);
-            } catch (InterruptedException e) {
-                break;
             }
+        } catch (Exception e) {
+            fail(e.getMessage());
         }
     }
 
@@ -92,54 +99,39 @@ public class CamelSinkCXFITCase extends AbstractKafkaTest {
         return service.getJaxWsServerAddress() + "?serviceClass=org.apache.hello_world_soap_http.Greeter";
     }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory, String message, int count)
-            throws ExecutionException, InterruptedException, TimeoutException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
-        ExecutorService service = Executors.newCachedThreadPool();
-        Runnable r = () -> this.putRecords(message, count);
-        service.submit(r);
-        Thread.sleep(5000);
-        LOG.debug("Created the consumer ... About to receive messages");
-    }
-
     @Test
-    public void testBasicSendReceiveUsingUrl() {
-        try {
-            ConnectorPropertyFactory connectorPropertyFactory = CamelSinkCXFPropertyFactory
-                    .basic()
-                    .withName("CamelCXFSinkConnector")
-                    .withTopics(topicName)
-                    .withAddress(service.getSimpleServerAddress())
-                    .withServiceClass("org.apache.camel.kafkaconnector.cxf.source.HelloService")
-                    .withDataFormat("RAW");
+    public void testBasicSendReceiveUsingUrl() throws Exception {
+        InputStream stream = this.getClass().getResource("/hello-service-test.xml").openStream();
+        String testMessage = IOUtils.toString(stream, Charset.defaultCharset());
 
-            runTest(connectorPropertyFactory, TEST_MESSAGE, expect);
+        ConnectorPropertyFactory connectorPropertyFactory = CamelSinkCXFPropertyFactory
+                .basic()
+                .withName("CamelCXFSinkConnector")
+                .withTopics(topicName)
+                .withAddress(service.getSimpleServerAddress())
+                .withServiceClass("org.apache.camel.kafkaconnector.cxf.common.HelloService")
+                .withDataFormat("RAW");
 
-            assertEquals(expect, serverFactoryBeanConfigurator.getInvocationCount());
-        } catch (Exception e) {
-            LOG.error("CXF Sink test failed: {} {}", e.getMessage(), e);
-            fail(e.getMessage(), e);
-        }
+        runTest(connectorPropertyFactory, () -> putRecords(testMessage, expect));
+
+        assertEquals(expect, serverFactoryBeanConfigurator.getInvocationCount());
     }
 
     @Test
     @Timeout(90)
-    public void testJaxWsBasicSendReceiveUsingUrl() {
-        try {
-            ConnectorPropertyFactory connectorPropertyFactory = CamelSinkCXFPropertyFactory
-                    .basic()
-                    .withName("CamelCXFSinkConnectorUrl")
-                    .withTopics(topicName)
-                    .withAddress(getJaxwsEndpointUri())
-                    .withDataFormat("RAW");
+    public void testJaxWsBasicSendReceiveUsingUrl() throws Exception {
+        InputStream stream = this.getClass().getResource("/jaxws-test.xml").openStream();
+        String testMessage = IOUtils.toString(stream, Charset.defaultCharset());
 
-            runTest(connectorPropertyFactory, JAXWS_TEST_MESSAGE, 1);
+        ConnectorPropertyFactory connectorPropertyFactory = CamelSinkCXFPropertyFactory
+                .basic()
+                .withName("CamelCXFSinkConnectorUrl")
+                .withTopics(topicName)
+                .withAddress(getJaxwsEndpointUri())
+                .withDataFormat("RAW");
 
-            assertTrue(GreeterImpl.outputFile().exists(), "The test output file was not created");
-        } catch (Exception e) {
-            LOG.error("CXF Sink test failed: {} {}", e.getMessage(), e);
-            fail(e.getMessage(), e);
-        }
+        runTest(connectorPropertyFactory, () -> putRecords(testMessage, 1));
+
+        assertTrue(GreeterImpl.outputFile().exists(), "The test output file was not created");
     }
 }
\ No newline at end of file
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/HelloServiceImpl.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/HelloServiceImpl.java
index 88f1f12..3aeed6d 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/HelloServiceImpl.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/HelloServiceImpl.java
@@ -18,7 +18,7 @@ package org.apache.camel.kafkaconnector.cxf.sink;
 
 import java.util.List;
 
-import org.apache.camel.kafkaconnector.cxf.source.HelloService;
+import org.apache.camel.kafkaconnector.cxf.common.HelloService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/SinkServerFactoryBeanConfigurator.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/SinkServerFactoryBeanConfigurator.java
index 28860da..e410d09 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/SinkServerFactoryBeanConfigurator.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/SinkServerFactoryBeanConfigurator.java
@@ -16,8 +16,8 @@
  */
 package org.apache.camel.kafkaconnector.cxf.sink;
 
+import org.apache.camel.kafkaconnector.cxf.common.HelloService;
 import org.apache.camel.kafkaconnector.cxf.services.ServerFactoryBeanConfigurator;
-import org.apache.camel.kafkaconnector.cxf.source.HelloService;
 import org.apache.cxf.frontend.ServerFactoryBean;
 
 class SinkServerFactoryBeanConfigurator implements ServerFactoryBeanConfigurator {
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java
index 9a75ea4..a4327ed 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java
@@ -25,6 +25,7 @@ import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.kafkaconnector.cxf.client.CXFServiceUtil;
+import org.apache.camel.kafkaconnector.cxf.common.HelloService;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -43,7 +44,7 @@ public class CamelSourceCXFITCase extends AbstractKafkaTest {
     protected static final int PORT = NetworkUtils.getFreePort("localhost");
     protected static final String SIMPLE_ENDPOINT_ADDRESS = "http://localhost:" + PORT + "/CxfConsumerTest/test";
     protected static final String SIMPLE_ENDPOINT_URI = SIMPLE_ENDPOINT_ADDRESS
-            + "?serviceClass=org.apache.camel.kafkaconnector.cxf.source.HelloService"
+            + "?serviceClass=org.apache.camel.kafkaconnector.cxf.common.HelloService"
             + "&publishedEndpointUrl=http://www.simple.com/services/test";
 
     private static final String TEST_MESSAGE = "Hello World!";
@@ -104,7 +105,7 @@ public class CamelSourceCXFITCase extends AbstractKafkaTest {
         try {
             ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory.basic()
                     .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())).withAddress(SIMPLE_ENDPOINT_ADDRESS)
-                    .withServiceClass("org.apache.camel.kafkaconnector.cxf.source.HelloService");
+                    .withServiceClass("org.apache.camel.kafkaconnector.cxf.common.HelloService");
 
             runBasicStringTest(connectorPropertyFactory);
         } catch (Exception e) {
@@ -134,7 +135,7 @@ public class CamelSourceCXFITCase extends AbstractKafkaTest {
         try {
             ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory.basic()
                     .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())).withAddress(SIMPLE_ENDPOINT_ADDRESS)
-                    .withServiceClass("org.apache.camel.kafkaconnector.cxf.source.HelloService").withDataFormat("POJO");
+                    .withServiceClass("org.apache.camel.kafkaconnector.cxf.common.HelloService").withDataFormat("POJO");
 
             runBasicStringTest(connectorPropertyFactory);
         } catch (Exception e) {
diff --git a/tests/itests-cxf/src/test/resources/hello-service-test.xml b/tests/itests-cxf/src/test/resources/hello-service-test.xml
new file mode 100644
index 0000000..9405a47
--- /dev/null
+++ b/tests/itests-cxf/src/test/resources/hello-service-test.xml
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/">
+    <soap:Body>
+        <ns1:echo xmlns:ns1="http://source.cxf.kafkaconnector.camel.apache.org/">
+            <arg0 xmlns="http://source.cxf.kafkaconnector.camel.apache.org/">hello world</arg0>
+        </ns1:echo>
+    </soap:Body>
+</soap:Envelope>
\ No newline at end of file
diff --git a/tests/itests-cxf/src/test/resources/jaxws-test.xml b/tests/itests-cxf/src/test/resources/jaxws-test.xml
new file mode 100644
index 0000000..02b330c
--- /dev/null
+++ b/tests/itests-cxf/src/test/resources/jaxws-test.xml
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/">
+    <soap:Body>
+        <ns1:greetMe xmlns:ns1="http://apache.org/hello_world_soap_http/types">
+            <requestType xmlns="http://apache.org/hello_world_soap_http/types">hello world!</requestType>
+        </ns1:greetMe>
+    </soap:Body>
+</soap:Envelope>
\ No newline at end of file