You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/15 10:39:06 UTC

[camel-kafka-connector] 05/10: #873 initial cxf Source/Sink connectors test (#940)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 3b6f14b3ce401a7a0353c60df620bfe5ebc20300
Author: Freeman(Yue) Fang <fr...@gmail.com>
AuthorDate: Tue Feb 9 10:34:40 2021 -0500

    #873 initial cxf Source/Sink connectors test (#940)
    
    * #873 initial cxf Source/Sink connectors test
    
    * revise according to feedback
---
 tests/itests-cxf/pom.xml                           | 100 +++++++++++
 .../cxf/sink/CamelSinkCXFITCase.java               | 189 +++++++++++++++++++++
 .../cxf/sink/CamelSinkCXFPropertyFactory.java      |  58 +++++++
 .../camel/kafkaconnector/cxf/sink/GreeterImpl.java |  30 ++++
 .../kafkaconnector/cxf/sink/HelloServiceImpl.java  |  81 +++++++++
 .../cxf/source/CamelSourceCXFITCase.java           | 181 ++++++++++++++++++++
 .../cxf/source/CamelSourceCXFPropertyFactory.java  |  64 +++++++
 .../kafkaconnector/cxf/source/HelloService.java    |  35 ++++
 8 files changed, 738 insertions(+)

diff --git a/tests/itests-cxf/pom.xml b/tests/itests-cxf/pom.xml
new file mode 100644
index 0000000..88ea1fa
--- /dev/null
+++ b/tests/itests-cxf/pom.xml
@@ -0,0 +1,100 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.camel.kafkaconnector</groupId>
+        <artifactId>itests-parent</artifactId>
+        <version>0.8.0-SNAPSHOT</version>
+        <relativePath>../itests-parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>itests-cxf</artifactId>
+    <name>Camel-Kafka-Connector :: Tests :: CXF</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.kafkaconnector</groupId>
+            <artifactId>itests-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test-infra-common</artifactId>
+            <version>${camel.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test-infra-dispatch-router</artifactId>
+            <version>${camel.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-cxf</artifactId>
+        </dependency>
+        
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-transports-http-jetty</artifactId>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.eclipse.jetty</groupId>
+                    <artifactId>jetty-server</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.eclipse.jetty</groupId>
+                    <artifactId>jetty-util</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.eclipse.jetty</groupId>
+                    <artifactId>jetty-io</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.eclipse.jetty</groupId>
+                    <artifactId>jetty-security</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.eclipse.jetty</groupId>
+                    <artifactId>jetty-continuation</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.eclipse.jetty</groupId>
+                    <artifactId>jetty-http</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-testutils</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+
+</project>
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java
new file mode 100644
index 0000000..61c01c1
--- /dev/null
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.cxf.sink;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
+
+import javax.xml.ws.Endpoint;
+
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.cxf.source.HelloService;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.endpoint.Server;
+import org.apache.cxf.ext.logging.LoggingInInterceptor;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.frontend.ServerFactoryBean;
+import org.apache.cxf.jaxws.EndpointImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class CamelSinkCXFITCase extends AbstractKafkaTest {
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkCXFITCase.class);
+      
+
+    private final int expect = 10;
+    
+    private final int simplePort = NetworkUtils.getFreePort("localhost");
+    private final int jaxwsPort = NetworkUtils.getFreePort("localhost");
+
+    protected static final String ECHO_OPERATION = "echo";
+    protected static final String GREET_ME_OPERATION = "greetMe";
+    protected static final String TEST_MESSAGE = "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">"
+        + "<soap:Body><ns1:echo xmlns:ns1=\"http://source.cxf.kafkaconnector.camel.apache.org/\">"
+        + "<arg0 xmlns=\"http://source.cxf.kafkaconnector.camel.apache.org/\">hello world</arg0>"
+        + "</ns1:echo></soap:Body></soap:Envelope>";
+    protected static final String JAXWS_TEST_MESSAGE = "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">\"\n" + 
+        "        + \"<soap:Body><ns1:greetMe xmlns:ns1=\"http://apache.org/hello_world_soap_http/types\">\"\n" + 
+        "        + \"<requestType xmlns=\"http://apache.org/hello_world_soap_http/types\">hello world!</requestType>\"\n" + 
+        "        + \"</ns1:greetMe></soap:Body></soap:Envelope>";
+
+    protected Server server;
+    protected EndpointImpl endpoint;
+    
+    
+
+    
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-cxf-kafka-connector"};
+    }
+
+    protected String getSimpleServerAddress() {
+        return "http://localhost:" + simplePort + "/" + getClass().getSimpleName() + "/simpletest";
+    }
+
+    protected String getJaxWsServerAddress() {
+        return "http://localhost:" + jaxwsPort + "/" + getClass().getSimpleName() + "/jaxwstest";
+    }
+
+    
+    @BeforeEach
+    public void setUp() throws IOException {
+        // start a simple front service
+        ServerFactoryBean svrBean = new ServerFactoryBean();
+        svrBean.setAddress(getSimpleServerAddress());
+        svrBean.setServiceClass(HelloService.class);
+        svrBean.setServiceBean(new HelloServiceImpl());
+        svrBean.setBus(BusFactory.getDefaultBus());
+        server = svrBean.create();
+        server.getEndpoint().getInInterceptors().add(new LoggingInInterceptor());
+        server.getEndpoint().getOutInterceptors().add(new LoggingOutInterceptor());
+        // start a jaxws front service
+        GreeterImpl greeterImpl = new GreeterImpl();
+        endpoint = (EndpointImpl)Endpoint.publish(getJaxWsServerAddress(), greeterImpl);
+        endpoint.getInInterceptors().add(new LoggingInInterceptor());
+        endpoint.getOutInterceptors().add(new LoggingOutInterceptor());
+    }
+
+    @AfterEach
+    public void tearDown() {
+        endpoint.stop();
+        server.stop();
+        server.destroy();
+    }
+
+
+    private void putRecords(String message) {
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+
+        for (int i = 0; i < expect; i++) {
+            try {
+                kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), message);
+            } catch (ExecutionException e) {
+                LOG.error("Unable to produce messages: {}", e.getMessage(), e);
+            } catch (InterruptedException e) {
+                break;
+            } 
+        }
+    }
+
+    public void runTest(ConnectorPropertyFactory connectorPropertyFactory, String message) throws ExecutionException, InterruptedException, TimeoutException {
+        connectorPropertyFactory.log();
+        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
+        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+        ExecutorService service = Executors.newCachedThreadPool();
+        Runnable r = () -> this.putRecords(message);
+        service.submit(r);
+        Thread.sleep(5000);
+        LOG.debug("Created the consumer ... About to receive messages");
+                
+    }
+
+    @Test
+    @Timeout(90)
+    public void testBasicSendReceiveUsingUrl() {
+        try {
+            
+
+            ConnectorPropertyFactory connectorPropertyFactory = CamelSinkCXFPropertyFactory.basic()
+                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                    .withAddress(getSimpleServerAddress())
+                    .withServiceClass("org.apache.camel.kafkaconnector.cxf.source.HelloService")
+                    .withDataFormat("RAW");
+
+            runTest(connectorPropertyFactory, TEST_MESSAGE);
+        } catch (Exception e) {
+            LOG.error("CXF Sink test failed: {} {}", e.getMessage(), e);
+            fail(e.getMessage(), e);
+        }
+    }
+
+    @Test
+    @Timeout(90)
+    public void testJaxWsBasicSendReceiveUsingUrl() {
+        try {
+            
+
+            ConnectorPropertyFactory connectorPropertyFactory = CamelSinkCXFPropertyFactory.basic()
+                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                    .withAddress(this.getJaxwsEndpointUri())
+                    .withDataFormat("RAW");
+
+            runTest(connectorPropertyFactory, JAXWS_TEST_MESSAGE);
+        } catch (Exception e) {
+            LOG.error("CXF Sink test failed: {} {}", e.getMessage(), e);
+            fail(e.getMessage(), e);
+        }
+    }
+    
+    protected String getSimpleEndpointUri() {
+        return getSimpleServerAddress()
+               + "?serviceClass=org.apache.camel.kafkaconnector.cxf.source.HelloService";
+    }
+
+    protected String getJaxwsEndpointUri() {
+        return getJaxWsServerAddress() + "?serviceClass=org.apache.hello_world_soap_http.Greeter";
+    }
+
+    
+}
\ No newline at end of file
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFPropertyFactory.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFPropertyFactory.java
new file mode 100644
index 0000000..e7ed6a7
--- /dev/null
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFPropertyFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.cxf.sink;
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+
+
+final class CamelSinkCXFPropertyFactory extends SinkConnectorPropertyFactory<CamelSinkCXFPropertyFactory> {
+    private CamelSinkCXFPropertyFactory() {
+
+    }
+
+    
+
+    public EndpointUrlBuilder<CamelSinkCXFPropertyFactory> withUrl(String serviceUrl) {
+        String url = String.format("cxf://%s", serviceUrl);
+
+        return new EndpointUrlBuilder<>(this::withSinkUrl, url);
+    }
+    
+    public CamelSinkCXFPropertyFactory withDataFormat(String dataFormat) {
+        return setProperty("camel.sink.endpoint.dataFormat", dataFormat);
+    }
+    
+    public CamelSinkCXFPropertyFactory withAddress(String address) {
+        return setProperty("camel.sink.path.address", address);
+    }
+    
+    public CamelSinkCXFPropertyFactory withServiceClass(String serviceClass) {
+        return setProperty("camel.sink.endpoint.serviceClass", serviceClass);
+    }
+
+    public static CamelSinkCXFPropertyFactory basic() {
+        return new CamelSinkCXFPropertyFactory()
+                .withTasksMax(1)
+                .withName("CamelCXFSinkConnector")
+                .withConnectorClass("org.apache.camel.kafkaconnector.cxf.CamelCxfSinkConnector")
+                .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+    }
+}
+
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
new file mode 100644
index 0000000..a5b909d
--- /dev/null
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.cxf.sink;
+
+import java.util.logging.Logger;
+
+
+public class GreeterImpl extends org.apache.hello_world_soap_http.GreeterImpl {
+    
+    private static final Logger LOG = Logger.getLogger(GreeterImpl.class.getName());
+    public String greetMe(String hi) {
+        LOG.info("jaxws greetMe " + hi);
+        return "Greet " + hi;
+    }
+}
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/HelloServiceImpl.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/HelloServiceImpl.java
new file mode 100644
index 0000000..42f12f5
--- /dev/null
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/HelloServiceImpl.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.cxf.sink;
+
+import java.util.List;
+
+import org.apache.camel.kafkaconnector.cxf.source.HelloService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HelloServiceImpl implements HelloService {
+    private static final Logger LOG = LoggerFactory.getLogger(HelloServiceImpl.class);
+    public static int invocationCount = 0;
+
+    private String name;
+
+    public HelloServiceImpl(String name) {
+        this.name = name;
+    }
+
+    public HelloServiceImpl() {
+        name = "";
+    }
+
+    @Override
+    public String echo(String text) {
+        LOG.info("call for echo with " + text);
+        invocationCount++;
+        LOG.info("invocationCount is " + invocationCount);
+        return "echo " + text;
+    }
+
+    @Override
+    public void ping() {
+        invocationCount++;
+        LOG.info("call for oneway ping");
+    }
+
+    @Override
+    public int getInvocationCount() {
+        return invocationCount;
+    }
+
+    @Override
+    public String sayHello() {
+        
+        return "hello" + name;
+    }
+
+    @Override
+    public Boolean echoBoolean(Boolean bool) {
+        LOG.info("call for echoBoolean with " + bool);
+        invocationCount++;
+        LOG.info("invocationCount is " + invocationCount);
+        return bool;
+    }
+
+    @Override
+    public String complexParameters(List<String> par1, List<String> par2) {
+        String result = "param";
+        if (par1 != null && par2 != null) {
+            result = result + ":" + par1.get(0) + par2.get(0);
+        }
+        return result;
+    }
+
+}
\ No newline at end of file
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java
new file mode 100644
index 0000000..4ddf9e8
--- /dev/null
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.cxf.source;
+
+import java.util.concurrent.ExecutionException;
+
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.ext.logging.LoggingInInterceptor;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.frontend.ClientFactoryBean;
+import org.apache.cxf.frontend.ClientProxyFactoryBean;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+
+/**
+ * A simple test case that checks whether the CXF Consumer Endpoint produces the expected number of
+ * messages
+ */
+public class CamelSourceCXFITCase extends AbstractKafkaTest {
+    
+    protected static final int PORT = NetworkUtils.getFreePort("localhost");
+    protected static final String SIMPLE_ENDPOINT_ADDRESS = "http://localhost:" + PORT
+        + "/CxfConsumerTest/test";
+    protected static final String SIMPLE_ENDPOINT_URI =  SIMPLE_ENDPOINT_ADDRESS
+        + "?serviceClass=org.apache.camel.kafkaconnector.cxf.source.HelloService"
+        + "&publishedEndpointUrl=http://www.simple.com/services/test";
+
+    
+    private static final String TEST_MESSAGE = "Hello World!";
+    
+
+    
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceCXFITCase.class);
+
+    private int received;
+    private final int expect = 1;
+    
+
+    
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-cxf-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        received = 0;
+        
+    }
+
+    private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
+        LOG.debug("Received: {}", record.value());
+        
+        received++;
+
+        if (received == expect) {
+            return false;
+        }
+
+        return true;
+    }
+
+
+
+    public void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
+        connectorPropertyFactory.log();
+        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
+        Thread.sleep(5000);//ensure cxf source connector is up
+        ClientProxyFactoryBean proxyFactory = new ClientProxyFactoryBean();
+        ClientFactoryBean clientBean = proxyFactory.getClientFactoryBean();
+        clientBean.setAddress(SIMPLE_ENDPOINT_ADDRESS);
+        clientBean.setServiceClass(HelloService.class);
+        Bus bus = BusFactory.newInstance().createBus();
+        clientBean.setBus(bus);
+        bus.getInInterceptors().add(new LoggingInInterceptor());
+        bus.getOutInterceptors().add(new LoggingOutInterceptor());
+        HelloService client = (HelloService) proxyFactory.create();
+        try {
+            String result = client.echo(TEST_MESSAGE);
+            assertEquals(result, TEST_MESSAGE);
+        } catch (Exception e) {
+            LOG.info("Test Invocation Failure", e);
+        }
+        
+        
+        LOG.debug("Creating the consumer ...");
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
+        LOG.debug("Created the consumer ...");
+
+        assertEquals(received, expect, "Didn't process the expected amount of messages");
+    }
+
+    
+
+    @Test
+    @Timeout(20)
+    public void testBasicSendReceive() {
+        try {
+            ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory
+                    .basic()
+                    .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                    .withAddress(SIMPLE_ENDPOINT_ADDRESS)
+                    .withServiceClass("org.apache.camel.kafkaconnector.cxf.source.HelloService");
+                                        
+
+            runBasicStringTest(connectorPropertyFactory);
+        } catch (Exception e) {
+            LOG.error("CXF test failed: {}", e.getMessage(), e);
+            fail(e.getMessage());
+        }
+    }
+    
+    @Test
+    @Timeout(20)
+    public void testBasicSendReceiveUsingUrl() {
+        try {
+            ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory
+                    .basic()
+                    .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                    .withUrl(SIMPLE_ENDPOINT_URI).buildUrl();
+                    
+
+            runBasicStringTest(connectorPropertyFactory);
+        } catch (Exception e) {
+            LOG.error("CXF test failed: {}", e.getMessage(), e);
+            fail(e.getMessage());
+        }
+    }
+
+    
+    @Test
+    @Timeout(20)
+    public void testBasicSendReceiveUsingDataFormat() {
+        try {
+            ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory
+                .basic()
+                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withAddress(SIMPLE_ENDPOINT_ADDRESS)
+                .withServiceClass("org.apache.camel.kafkaconnector.cxf.source.HelloService")
+                .withDataFormat("POJO");
+                    
+
+            runBasicStringTest(connectorPropertyFactory);
+        } catch (Exception e) {
+            LOG.error("CXF test failed: {}", e.getMessage(), e);
+            fail(e.getMessage());
+        }
+    }
+
+    
+}
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFPropertyFactory.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFPropertyFactory.java
new file mode 100644
index 0000000..7d054e5
--- /dev/null
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFPropertyFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.cxf.source;
+
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
+
+
+/**
+ * Creates the set of properties used by a Camel CXF Source Connector
+ */
+final class CamelSourceCXFPropertyFactory extends SourceConnectorPropertyFactory<CamelSourceCXFPropertyFactory> {
+    private CamelSourceCXFPropertyFactory() {
+
+    }
+
+    public CamelSourceCXFPropertyFactory withAddress(String address) {
+        return setProperty("camel.source.path.address", address);
+    }
+    
+    public CamelSourceCXFPropertyFactory withServiceClass(String serviceClass) {
+        return setProperty("camel.source.endpoint.serviceClass", serviceClass);
+    }
+    
+    public CamelSourceCXFPropertyFactory withPublishedEndpointUrl(String publishedEndpointUrl) {
+        return setProperty("camel.source.endpoint.publishedEndpointUrl", publishedEndpointUrl);
+    }
+    
+    public CamelSourceCXFPropertyFactory withDataFormat(String dataFormat) {
+        return setProperty("camel.source.endpoint.dataFormat", dataFormat);
+    }
+        
+    public static CamelSourceCXFPropertyFactory basic() {
+        return new CamelSourceCXFPropertyFactory()
+                .withName("CamelCXFSourceConnector")
+                .withTasksMax(1)
+                .withConnectorClass("org.apache.camel.kafkaconnector.cxf.CamelCxfSourceConnector")
+                .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+    }
+
+    public EndpointUrlBuilder<CamelSourceCXFPropertyFactory> withUrl(String cxfUrl) {
+        String url = String.format("cxf://%s", cxfUrl);
+        return new EndpointUrlBuilder<>(this::withSourceUrl, url);
+    }
+
+    
+}
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/HelloService.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/HelloService.java
new file mode 100644
index 0000000..5c4653f
--- /dev/null
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/HelloService.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.cxf.source;
+
+import java.util.List;
+
+public interface HelloService {
+    String sayHello();
+
+    void ping();
+
+    int getInvocationCount();
+
+    String echo(String text) throws Exception;
+
+    Boolean echoBoolean(Boolean bool);
+
+    String complexParameters(List<String> par1, List<String> par2);
+
+}