You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/19 10:51:09 UTC

[camel-kafka-connector] branch camel-master updated: Refactor the CXF servers into a service for simpler reuse

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/camel-master by this push:
     new 4870fd5  Refactor the CXF servers into a service for simpler reuse
4870fd5 is described below

commit 4870fd543337637a1f7fcc1d1c0aacdea741dfbe
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Thu Feb 18 10:53:48 2021 +0100

    Refactor the CXF servers into a service for simpler reuse
---
 .../GreeterImpl.java => common/CXFProperties.java} |  13 +--
 .../cxf/services/CXFEmbeddedServerService.java     | 107 ++++++++++++++++++
 .../GreeterImpl.java => services/CXFService.java}  |  26 +++--
 .../JaxWsServiceConfigurator.java}                 |  15 +--
 .../ServerFactoryBeanConfigurator.java}            |  15 +--
 .../cxf/sink/CamelSinkCXFITCase.java               | 119 ++++++++-------------
 .../cxf/sink/CamelSinkCXFPropertyFactory.java      |   3 +-
 .../camel/kafkaconnector/cxf/sink/GreeterImpl.java |  20 +++-
 ...Impl.java => SinkJaxWsServiceConfigurator.java} |  16 +--
 ...java => SinkServerFactoryBeanConfigurator.java} |  19 ++--
 10 files changed, 223 insertions(+), 130 deletions(-)

diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/common/CXFProperties.java
similarity index 69%
copy from tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
copy to tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/common/CXFProperties.java
index 9981604..fbd581a 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/common/CXFProperties.java
@@ -15,16 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.camel.kafkaconnector.cxf.sink;
+package org.apache.camel.kafkaconnector.cxf.common;
 
-import java.util.logging.Logger;
+public final class CXFProperties {
+    public static final String SIMPLE_SERVER_ADDRESS = "cxf.simple.server.address";
+    public static final String JAXWS_SERVER_ADDRESS = "cxf.jaxws.server.address";
 
-public class GreeterImpl extends org.apache.hello_world_soap_http.GreeterImpl {
+    private CXFProperties() {
 
-    private static final Logger LOG = Logger.getLogger(GreeterImpl.class.getName());
-
-    public String greetMe(String hi) {
-        LOG.info("jaxws greetMe " + hi);
-        return "Greet " + hi;
     }
 }
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/services/CXFEmbeddedServerService.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/services/CXFEmbeddedServerService.java
new file mode 100644
index 0000000..6a1026b
--- /dev/null
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/services/CXFEmbeddedServerService.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.cxf.services;
+
+import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.cxf.common.CXFProperties;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.endpoint.Server;
+import org.apache.cxf.ext.logging.LoggingInInterceptor;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.frontend.ServerFactoryBean;
+import org.apache.cxf.jaxws.EndpointImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CXFEmbeddedServerService implements CXFService {
+    private static final Logger LOG = LoggerFactory.getLogger(CXFEmbeddedServerService.class);
+
+    private final ServerFactoryBeanConfigurator serverFactoryBeanConfigurator;
+    private final JaxWsServiceConfigurator jaxwsServiceConfigurator;
+    private Server server;
+    private EndpointImpl endpoint;
+
+    private int simplePort;
+    private int jaxWsPort;
+
+    public CXFEmbeddedServerService(ServerFactoryBeanConfigurator serverFactoryBeanConfigurator, JaxWsServiceConfigurator jaxwsServiceConfigurator) {
+        this.serverFactoryBeanConfigurator = serverFactoryBeanConfigurator;
+        this.jaxwsServiceConfigurator = jaxwsServiceConfigurator;
+    }
+
+    @Override
+    public String getSimpleServerAddress() {
+        return String.format("http://%s:%d/%s/simpletest", NetworkUtils.getHostname(), simplePort, getClass().getSimpleName());
+    }
+
+    @Override
+    public String getJaxWsServerAddress() {
+        return String.format("http://%s:%d/%s/jaxwstest", NetworkUtils.getHostname(), jaxWsPort, getClass().getSimpleName());
+    }
+
+    public void configure() {
+        simplePort = NetworkUtils.getFreePort();
+        jaxWsPort = NetworkUtils.getFreePort();
+
+        ServerFactoryBean svrBean = new ServerFactoryBean();
+
+        // start a simple front service
+        svrBean.setAddress(getSimpleServerAddress());
+
+        serverFactoryBeanConfigurator.configure(svrBean);
+        svrBean.setBus(BusFactory.getDefaultBus());
+
+        server = svrBean.create();
+        server.getEndpoint().getInInterceptors().add(new LoggingInInterceptor());
+        server.getEndpoint().getOutInterceptors().add(new LoggingOutInterceptor());
+
+        // start a jaxws front service from the custom configurator
+        endpoint = jaxwsServiceConfigurator.configureEndpoint(getJaxWsServerAddress());
+
+        endpoint.getInInterceptors().add(new LoggingInInterceptor());
+        endpoint.getOutInterceptors().add(new LoggingOutInterceptor());
+
+        TestUtils.waitFor(() -> endpoint.isPublished());
+    }
+
+    @Override
+    public void registerProperties() {
+        System.setProperty(CXFProperties.SIMPLE_SERVER_ADDRESS, getSimpleServerAddress());
+        System.setProperty(CXFProperties.JAXWS_SERVER_ADDRESS, getJaxWsServerAddress());
+    }
+
+    @Override
+    public void initialize() {
+        LOG.info("Trying to start the CXF embedded server");
+        configure();
+
+        registerProperties();
+
+        LOG.info("CXF simple service running at {}", getSimpleServerAddress());
+        LOG.info("CXF JAX WS service running at {}", getJaxWsServerAddress());
+    }
+
+    @Override
+    public void shutdown() {
+        LOG.info("Stopping the CXF embedded server");
+
+        endpoint.stop();
+        server.stop();
+        server.destroy();
+    }
+}
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/services/CXFService.java
similarity index 52%
copy from tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
copy to tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/services/CXFService.java
index 9981604..b89f043 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/services/CXFService.java
@@ -14,17 +14,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.camel.kafkaconnector.cxf.services;
 
-package org.apache.camel.kafkaconnector.cxf.sink;
+import org.apache.camel.test.infra.common.services.TestService;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
 
-import java.util.logging.Logger;
+/**
+ * Test infra service for CXF
+ */
+public interface CXFService extends BeforeEachCallback, AfterEachCallback, TestService {
+
+    String getSimpleServerAddress();
 
-public class GreeterImpl extends org.apache.hello_world_soap_http.GreeterImpl {
+    String getJaxWsServerAddress();
 
-    private static final Logger LOG = Logger.getLogger(GreeterImpl.class.getName());
+    @Override
+    default void beforeEach(ExtensionContext extensionContext) throws Exception {
+        initialize();
+    }
 
-    public String greetMe(String hi) {
-        LOG.info("jaxws greetMe " + hi);
-        return "Greet " + hi;
+    @Override
+    default void afterEach(ExtensionContext extensionContext) throws Exception {
+        shutdown();
     }
 }
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/services/JaxWsServiceConfigurator.java
similarity index 68%
copy from tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
copy to tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/services/JaxWsServiceConfigurator.java
index 9981604..47c5d09 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/services/JaxWsServiceConfigurator.java
@@ -14,17 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.camel.kafkaconnector.cxf.services;
 
-package org.apache.camel.kafkaconnector.cxf.sink;
+import org.apache.cxf.jaxws.EndpointImpl;
 
-import java.util.logging.Logger;
-
-public class GreeterImpl extends org.apache.hello_world_soap_http.GreeterImpl {
-
-    private static final Logger LOG = Logger.getLogger(GreeterImpl.class.getName());
-
-    public String greetMe(String hi) {
-        LOG.info("jaxws greetMe " + hi);
-        return "Greet " + hi;
-    }
+public interface JaxWsServiceConfigurator {
+    EndpointImpl configureEndpoint(String address);
 }
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/services/ServerFactoryBeanConfigurator.java
similarity index 68%
copy from tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
copy to tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/services/ServerFactoryBeanConfigurator.java
index 9981604..2dfa1c2 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/services/ServerFactoryBeanConfigurator.java
@@ -14,17 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.camel.kafkaconnector.cxf.services;
 
-package org.apache.camel.kafkaconnector.cxf.sink;
+import org.apache.cxf.frontend.ServerFactoryBean;
 
-import java.util.logging.Logger;
-
-public class GreeterImpl extends org.apache.hello_world_soap_http.GreeterImpl {
-
-    private static final Logger LOG = Logger.getLogger(GreeterImpl.class.getName());
-
-    public String greetMe(String hi) {
-        LOG.info("jaxws greetMe " + hi);
-        return "Greet " + hi;
-    }
+public interface ServerFactoryBeanConfigurator {
+    void configure(ServerFactoryBean serverFactoryBean);
 }
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java
index 830f804..68bb9e7 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java
@@ -17,101 +17,69 @@
 
 package org.apache.camel.kafkaconnector.cxf.sink;
 
-import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeoutException;
 
-import javax.xml.ws.Endpoint;
-
 import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
-import org.apache.camel.kafkaconnector.cxf.source.HelloService;
-import org.apache.cxf.BusFactory;
-import org.apache.cxf.endpoint.Server;
-import org.apache.cxf.ext.logging.LoggingInInterceptor;
-import org.apache.cxf.ext.logging.LoggingOutInterceptor;
-import org.apache.cxf.frontend.ServerFactoryBean;
-import org.apache.cxf.jaxws.EndpointImpl;
-import org.junit.jupiter.api.AfterEach;
+import org.apache.camel.kafkaconnector.cxf.services.CXFEmbeddedServerService;
+import org.apache.camel.kafkaconnector.cxf.services.CXFService;
+import org.apache.camel.kafkaconnector.cxf.services.JaxWsServiceConfigurator;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 public class CamelSinkCXFITCase extends AbstractKafkaTest {
-    protected static final String ECHO_OPERATION = "echo";
-    protected static final String GREET_ME_OPERATION = "greetMe";
     protected static final String TEST_MESSAGE = "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">"
             + "<soap:Body><ns1:echo xmlns:ns1=\"http://source.cxf.kafkaconnector.camel.apache.org/\">"
             + "<arg0 xmlns=\"http://source.cxf.kafkaconnector.camel.apache.org/\">hello world</arg0>"
             + "</ns1:echo></soap:Body></soap:Envelope>";
+
     protected static final String JAXWS_TEST_MESSAGE = "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">\"\n"
             + "        + \"<soap:Body><ns1:greetMe xmlns:ns1=\"http://apache.org/hello_world_soap_http/types\">\"\n"
             + "        + \"<requestType xmlns=\"http://apache.org/hello_world_soap_http/types\">hello world!</requestType>\"\n"
             + "        + \"</ns1:greetMe></soap:Body></soap:Envelope>";
 
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkCXFITCase.class);
+    
+    private SinkServerFactoryBeanConfigurator serverFactoryBeanConfigurator = new SinkServerFactoryBeanConfigurator();
+    private JaxWsServiceConfigurator jaxWsServiceConfigurator = new SinkJaxWsServiceConfigurator();
+
+    @RegisterExtension
+    public CXFService service = new CXFEmbeddedServerService(serverFactoryBeanConfigurator, jaxWsServiceConfigurator);
 
-    protected Server server;
-    protected EndpointImpl endpoint;
 
-    private final int simplePort = NetworkUtils.getFreePort();
-    private final int jaxwsPort = NetworkUtils.getFreePort();
 
     private final int expect = 10;
+    private String topicName;
 
     @Override
     protected String[] getConnectorsInTest() {
-        return new String[] {"camel-cxf-kafka-connector"};
-    }
-
-    protected String getSimpleServerAddress() {
-        return "http://" + NetworkUtils.getHostname() + ":" + simplePort + "/" + getClass().getSimpleName() + "/simpletest";
-    }
-
-    protected String getJaxWsServerAddress() {
-        return "http://" + NetworkUtils.getHostname() + ":" + jaxwsPort + "/" + getClass().getSimpleName() + "/jaxwstest";
+        return new String[]{"camel-cxf-kafka-connector"};
     }
 
     @BeforeEach
-    public void setUp() throws IOException {
-        // start a simple front service
-        ServerFactoryBean svrBean = new ServerFactoryBean();
-        svrBean.setAddress(getSimpleServerAddress());
-        svrBean.setServiceClass(HelloService.class);
-        svrBean.setServiceBean(new HelloServiceImpl());
-        svrBean.setBus(BusFactory.getDefaultBus());
-        server = svrBean.create();
-        server.getEndpoint().getInInterceptors().add(new LoggingInInterceptor());
-        server.getEndpoint().getOutInterceptors().add(new LoggingOutInterceptor());
-        // start a jaxws front service
-        GreeterImpl greeterImpl = new GreeterImpl();
-        endpoint = (EndpointImpl) Endpoint.publish(getJaxWsServerAddress(), greeterImpl);
-        endpoint.getInInterceptors().add(new LoggingInInterceptor());
-        endpoint.getOutInterceptors().add(new LoggingOutInterceptor());
-    }
-
-    @AfterEach
-    public void tearDown() {
-        endpoint.stop();
-        server.stop();
-        server.destroy();
+    public void setUp() {
+        topicName = getTopicForTest(this);
+        GreeterImpl.outputFile().delete();
     }
 
-    private void putRecords(String message) {
+    private void putRecords(String message, int count) {
         KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
 
-        for (int i = 0; i < expect; i++) {
+        for (int i = 0; i < count; i++) {
             try {
-                kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), message);
+                kafkaClient.produce(topicName, message);
             } catch (ExecutionException e) {
                 LOG.error("Unable to produce messages: {}", e.getMessage(), e);
             } catch (InterruptedException e) {
@@ -120,29 +88,35 @@ public class CamelSinkCXFITCase extends AbstractKafkaTest {
         }
     }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory, String message)
+    public String getJaxwsEndpointUri() {
+        return service.getJaxWsServerAddress() + "?serviceClass=org.apache.hello_world_soap_http.Greeter";
+    }
+
+    public void runTest(ConnectorPropertyFactory connectorPropertyFactory, String message, int count)
             throws ExecutionException, InterruptedException, TimeoutException {
         connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
         getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
         ExecutorService service = Executors.newCachedThreadPool();
-        Runnable r = () -> this.putRecords(message);
+        Runnable r = () -> this.putRecords(message, count);
         service.submit(r);
         Thread.sleep(5000);
         LOG.debug("Created the consumer ... About to receive messages");
-
     }
 
     @Test
-    @Timeout(90)
     public void testBasicSendReceiveUsingUrl() {
         try {
+            ConnectorPropertyFactory connectorPropertyFactory = CamelSinkCXFPropertyFactory
+                    .basic()
+                    .withName("CamelCXFSinkConnector")
+                    .withTopics(topicName)
+                    .withAddress(service.getSimpleServerAddress())
+                    .withServiceClass("org.apache.camel.kafkaconnector.cxf.source.HelloService")
+                    .withDataFormat("RAW");
 
-            ConnectorPropertyFactory connectorPropertyFactory = CamelSinkCXFPropertyFactory.basic()
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass())).withAddress(getSimpleServerAddress())
-                    .withServiceClass("org.apache.camel.kafkaconnector.cxf.source.HelloService").withDataFormat("RAW");
+            runTest(connectorPropertyFactory, TEST_MESSAGE, expect);
 
-            runTest(connectorPropertyFactory, TEST_MESSAGE);
+            assertEquals(expect, serverFactoryBeanConfigurator.getInvocationCount());
         } catch (Exception e) {
             LOG.error("CXF Sink test failed: {} {}", e.getMessage(), e);
             fail(e.getMessage(), e);
@@ -153,24 +127,19 @@ public class CamelSinkCXFITCase extends AbstractKafkaTest {
     @Timeout(90)
     public void testJaxWsBasicSendReceiveUsingUrl() {
         try {
-
-            ConnectorPropertyFactory connectorPropertyFactory = CamelSinkCXFPropertyFactory.basic()
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass())).withAddress(this.getJaxwsEndpointUri())
+            ConnectorPropertyFactory connectorPropertyFactory = CamelSinkCXFPropertyFactory
+                    .basic()
+                    .withName("CamelCXFSinkConnectorUrl")
+                    .withTopics(topicName)
+                    .withAddress(getJaxwsEndpointUri())
                     .withDataFormat("RAW");
 
-            runTest(connectorPropertyFactory, JAXWS_TEST_MESSAGE);
+            runTest(connectorPropertyFactory, JAXWS_TEST_MESSAGE, 1);
+
+            assertTrue(GreeterImpl.outputFile().exists(), "The test output file was not created");
         } catch (Exception e) {
             LOG.error("CXF Sink test failed: {} {}", e.getMessage(), e);
             fail(e.getMessage(), e);
         }
     }
-
-    protected String getSimpleEndpointUri() {
-        return getSimpleServerAddress() + "?serviceClass=org.apache.camel.kafkaconnector.cxf.source.HelloService";
-    }
-
-    protected String getJaxwsEndpointUri() {
-        return getJaxWsServerAddress() + "?serviceClass=org.apache.hello_world_soap_http.Greeter";
-    }
-
 }
\ No newline at end of file
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFPropertyFactory.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFPropertyFactory.java
index 3e814df..f346686 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFPropertyFactory.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFPropertyFactory.java
@@ -44,7 +44,8 @@ final class CamelSinkCXFPropertyFactory extends SinkConnectorPropertyFactory<Cam
     }
 
     public static CamelSinkCXFPropertyFactory basic() {
-        return new CamelSinkCXFPropertyFactory().withTasksMax(1).withName("CamelCXFSinkConnector")
+        return new CamelSinkCXFPropertyFactory()
+                .withTasksMax(1)
                 .withConnectorClass("org.apache.camel.kafkaconnector.cxf.CamelCxfSinkConnector")
                 .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
                 .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
index 9981604..bccef12 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
@@ -17,14 +17,30 @@
 
 package org.apache.camel.kafkaconnector.cxf.sink;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.logging.Logger;
 
 public class GreeterImpl extends org.apache.hello_world_soap_http.GreeterImpl {
-
     private static final Logger LOG = Logger.getLogger(GreeterImpl.class.getName());
 
     public String greetMe(String hi) {
-        LOG.info("jaxws greetMe " + hi);
+        File outputFile = outputFile();
+
+        try {
+            outputFile.createNewFile();
+            LOG.info("jaxws greetMe " + hi);
+
+        } catch (IOException e) {
+            LOG.warning("Failed to create result test file");
+        }
+
         return "Greet " + hi;
     }
+
+    public static File outputFile() {
+        String path = GreeterImpl.class.getResource(".").getFile();
+
+        return new File(path, "cxf.test.result");
+    }
 }
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/SinkJaxWsServiceConfigurator.java
similarity index 67%
copy from tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
copy to tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/SinkJaxWsServiceConfigurator.java
index 9981604..fd7b56f 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/SinkJaxWsServiceConfigurator.java
@@ -14,17 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.kafkaconnector.cxf.sink;
 
-import java.util.logging.Logger;
-
-public class GreeterImpl extends org.apache.hello_world_soap_http.GreeterImpl {
+import javax.xml.ws.Endpoint;
 
-    private static final Logger LOG = Logger.getLogger(GreeterImpl.class.getName());
+import org.apache.camel.kafkaconnector.cxf.services.JaxWsServiceConfigurator;
+import org.apache.cxf.jaxws.EndpointImpl;
 
-    public String greetMe(String hi) {
-        LOG.info("jaxws greetMe " + hi);
-        return "Greet " + hi;
+class SinkJaxWsServiceConfigurator implements JaxWsServiceConfigurator {
+    @Override
+    public EndpointImpl configureEndpoint(String address) {
+        GreeterImpl greeterImpl = new GreeterImpl();
+        return (EndpointImpl) Endpoint.publish(address, greeterImpl);
     }
 }
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/SinkServerFactoryBeanConfigurator.java
similarity index 57%
copy from tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
copy to tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/SinkServerFactoryBeanConfigurator.java
index 9981604..28860da 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/SinkServerFactoryBeanConfigurator.java
@@ -14,17 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.kafkaconnector.cxf.sink;
 
-import java.util.logging.Logger;
+import org.apache.camel.kafkaconnector.cxf.services.ServerFactoryBeanConfigurator;
+import org.apache.camel.kafkaconnector.cxf.source.HelloService;
+import org.apache.cxf.frontend.ServerFactoryBean;
 
-public class GreeterImpl extends org.apache.hello_world_soap_http.GreeterImpl {
+class SinkServerFactoryBeanConfigurator implements ServerFactoryBeanConfigurator {
+    private HelloService helloService = new HelloServiceImpl();
 
-    private static final Logger LOG = Logger.getLogger(GreeterImpl.class.getName());
+    @Override
+    public void configure(ServerFactoryBean serverFactoryBean) {
+        serverFactoryBean.setServiceClass(HelloService.class);
+        serverFactoryBean.setServiceBean(helloService);
+    }
 
-    public String greetMe(String hi) {
-        LOG.info("jaxws greetMe " + hi);
-        return "Greet " + hi;
+    public int getInvocationCount() {
+        return helloService.getInvocationCount();
     }
 }