You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/15 10:39:01 UTC

[camel-kafka-connector] branch camel-master updated (087dfb5 -> 56ddf9e)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


    from 087dfb5  [create-pull-request] automated change
     new 9a156c3  Avoid creating everything from scratch for every Cassandra sink test
     new 95963aa  Avoid blocking connector initialization on SSH tests as it leads to failures on GH actions
     new abc05f2  Give another run on test failures to rule out false negatives due to slow CI environments
     new dd5ce04  Rework the syslog integration tests to let JUnit handle camel's lifecycle in the test
     new 3b6f14b  #873 initial cxf Source/Sink connectors test (#940)
     new 09df84f  Cleanup the check state logic on the KafkaConnectEmbeddedService
     new d3adf24  Set the port configuration to avoid the Jetty port error when creating the connect instance
     new ab56f43  Removed incorrect log message about connector initialization
     new 2b0bda2  Delete the connector before deleting the test topics
     new 56ddf9e  Avoid spawning Jetty servers too quickly as they seem to cause GH actions to fail

The 10 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/ci-build.yml                     |   1 +
 .github/workflows/daily-java-next.yml              |   1 +
 Jenkinsfile.jdk11                                  |   2 +-
 .../cassandra/sink/CamelSinkCassandraITCase.java   |  13 +-
 .../services/kafka/EmbeddedKafkaService.java       |   2 +
 .../kafkaconnect/KafkaConnectEmbedded.java         |  17 +-
 .../common/test/CamelSinkTestSupport.java          |  25 +++
 .../common/test/CamelSourceTestSupport.java        |   2 -
 tests/{itests-hdfs => itests-cxf}/pom.xml          |  48 ++++--
 .../cxf/sink/CamelSinkCXFITCase.java               | 189 +++++++++++++++++++++
 .../cxf/sink/CamelSinkCXFPropertyFactory.java}     |  36 ++--
 .../kafkaconnector/cxf/sink/GreeterImpl.java}      |  13 +-
 .../kafkaconnector/cxf/sink/HelloServiceImpl.java  |  81 +++++++++
 .../cxf/source/CamelSourceCXFITCase.java           | 181 ++++++++++++++++++++
 .../cxf/source/CamelSourceCXFPropertyFactory.java} |  44 +++--
 .../kafkaconnector/cxf/source/HelloService.java}   |  18 +-
 .../ssh/sink/CamelSinkSshITCase.java               |   2 +-
 .../syslog/services/RouteConfigurator.java}        |   9 +-
 ...slogService.java => SinkRouteConfigurator.java} |  41 ++---
 .../syslog/services/SourceRouteConfigurator.java   |  54 ++++++
 .../syslog/services/SyslogService.java             |  49 +++---
 .../syslog/sink/CamelSinkSyslogITCase.java         |  19 ++-
 .../syslog/source/CamelSourceSyslogITCase.java     |  64 +++----
 23 files changed, 752 insertions(+), 159 deletions(-)
 copy tests/{itests-hdfs => itests-cxf}/pom.xml (65%)
 create mode 100644 tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java
 copy tests/{itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelHTTPPropertyFactory.java => itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFPropertyFactory.java} (56%)
 copy tests/{itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/common/SshProperties.java => itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java} (69%)
 create mode 100644 tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/HelloServiceImpl.java
 create mode 100644 tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java
 copy tests/{itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelJMSPropertyFactory.java => itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFPropertyFactory.java} (50%)
 copy tests/{itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageConsumer.java => itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/HelloService.java} (72%)
 copy tests/{itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/FunctionalTestMessageProducer.java => itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/RouteConfigurator.java} (80%)
 copy tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/{SyslogService.java => SinkRouteConfigurator.java} (51%)
 create mode 100644 tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SourceRouteConfigurator.java


[camel-kafka-connector] 08/10: Removed incorrect log message about connector initialization

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit ab56f43f1b743ba23c68eabedf618e180cc16019
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Feb 15 10:35:45 2021 +0100

    Removed incorrect log message about connector initialization
---
 .../apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java | 2 --
 1 file changed, 2 deletions(-)

diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
index 7f8b03c..88f278d 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
@@ -70,7 +70,6 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest {
     public void runTest(ConnectorPropertyFactory connectorPropertyFactory, TestMessageConsumer<?> consumer,
                         FunctionalTestMessageProducer producer) throws ExecutionException, InterruptedException {
         connectorPropertyFactory.log();
-        LOG.debug("Initialized the connector and put the data for the test execution");
         getKafkaConnectService().initializeConnector(connectorPropertyFactory);
 
         LOG.debug("Producing test data to be collected by the connector and sent to Kafka");
@@ -107,7 +106,6 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest {
     public void runTestBlocking(ConnectorPropertyFactory connectorPropertyFactory, TestMessageConsumer<?> consumer,
                         FunctionalTestMessageProducer producer) throws ExecutionException, InterruptedException {
         connectorPropertyFactory.log();
-        LOG.debug("Initialized the connector and put the data for the test execution");
         getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
 
         LOG.debug("Producing test data to be collected by the connector and sent to Kafka");


[camel-kafka-connector] 04/10: Rework the syslog integration tests to let JUnit handle camel's lifecycle in the test

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit dd5ce04329c4601ae32027971335d0f051c27ed3
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Fri Feb 12 16:11:18 2021 +0100

    Rework the syslog integration tests to let JUnit handle camel's lifecycle in the test
    
    This fixes a few false negatives related to start/stop the CamelContext in the test
---
 .../syslog/services/RouteConfigurator.java         | 24 ++++++++++
 ...slogService.java => SinkRouteConfigurator.java} | 41 +++++++---------
 .../syslog/services/SourceRouteConfigurator.java   | 54 ++++++++++++++++++++++
 .../syslog/services/SyslogService.java             | 49 +++++++++++---------
 .../syslog/sink/CamelSinkSyslogITCase.java         | 19 ++++++--
 .../syslog/source/CamelSourceSyslogITCase.java     | 47 +++----------------
 6 files changed, 145 insertions(+), 89 deletions(-)

diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/RouteConfigurator.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/RouteConfigurator.java
new file mode 100644
index 0000000..4b20f72
--- /dev/null
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/RouteConfigurator.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.syslog.services;
+
+import org.apache.camel.CamelContext;
+
+public interface RouteConfigurator {
+    void configure(CamelContext camelContext) throws Exception;
+}
diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SyslogService.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SinkRouteConfigurator.java
similarity index 51%
copy from tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SyslogService.java
copy to tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SinkRouteConfigurator.java
index 3bc07b3..83f1a30 100644
--- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SyslogService.java
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SinkRouteConfigurator.java
@@ -14,49 +14,40 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.camel.kafkaconnector.syslog.services;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.syslog.SyslogDataFormat;
 import org.apache.camel.component.syslog.netty.Rfc5425FrameDecoder;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.junit.jupiter.api.extension.AfterAllCallback;
-import org.junit.jupiter.api.extension.BeforeAllCallback;
-import org.junit.jupiter.api.extension.ExtensionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class SyslogService implements BeforeAllCallback, AfterAllCallback {
-    private static final CamelContext CAMEL_CONTEXT = new DefaultCamelContext();
+public class SinkRouteConfigurator implements RouteConfigurator {
+    private static final Logger LOG = LoggerFactory.getLogger(SinkRouteConfigurator.class);
 
-    private static String protocol;
-    private static String host;
-    private static int port;
+    private final String protocol;
+    private final String host;
+    private final int port;
 
-    public SyslogService(String protocol, String host, int port) {
+    public SinkRouteConfigurator(String protocol, String host, int port) {
         this.protocol = protocol;
         this.host = host;
         this.port = port;
     }
 
     @Override
-    public void beforeAll(ExtensionContext context) throws Exception {
-        CAMEL_CONTEXT.getRegistry().bind("decoder", new Rfc5425FrameDecoder());
-        CAMEL_CONTEXT.addRoutes(new RouteBuilder() {
+    public void configure(CamelContext camelContext) throws Exception {
+        camelContext.getRegistry().bind("decoder", new Rfc5425FrameDecoder());
+
+        LOG.debug("Adding routes");
+        camelContext.addRoutes(new RouteBuilder() {
             @Override
             public void configure() {
-                from("netty:" + protocol + ":" + host + ":" + port + "?sync=false&decoders=#decoder").unmarshal(new SyslogDataFormat()).to("seda:syslog");
+                fromF("netty:%s://%s:%d?sync=false&decoders=#decoder", protocol, host, port)
+                        .unmarshal(new SyslogDataFormat()).to("seda:syslog");
             }
         });
-        CAMEL_CONTEXT.start();
-    }
-
-    @Override
-    public void afterAll(ExtensionContext context) {
-        CAMEL_CONTEXT.stop();
-    }
-
-    public Exchange getFirstExchangeToBeReceived() {
-        return CAMEL_CONTEXT.createConsumerTemplate().receive("seda:syslog", 10000L);
     }
 }
diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SourceRouteConfigurator.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SourceRouteConfigurator.java
new file mode 100644
index 0000000..a088940
--- /dev/null
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SourceRouteConfigurator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.syslog.services;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.syslog.SyslogDataFormat;
+import org.apache.camel.component.syslog.netty.Rfc5425Encoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SourceRouteConfigurator implements RouteConfigurator {
+    private static final Logger LOG = LoggerFactory.getLogger(SourceRouteConfigurator.class);
+
+    private final String protocol;
+    private final String host;
+    private final int port;
+
+    public SourceRouteConfigurator(String protocol, String host, int port) {
+        this.protocol = protocol;
+        this.host = host;
+        this.port = port;
+    }
+
+    @Override
+    public void configure(CamelContext camelContext) throws Exception {
+        camelContext.getRegistry().bind("encoder", new Rfc5425Encoder());
+
+        LOG.debug("Adding routes");
+        camelContext.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:test")
+                        .marshal(new SyslogDataFormat())
+                        .toF("netty:%s://%s:%d?sync=false&encoders=#encoder&useByteBuf=true", protocol, host, port);
+            }
+        });
+    }
+}
diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SyslogService.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SyslogService.java
index 3bc07b3..53195ca 100644
--- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SyslogService.java
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/services/SyslogService.java
@@ -18,45 +18,52 @@ package org.apache.camel.kafkaconnector.syslog.services;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.syslog.SyslogDataFormat;
-import org.apache.camel.component.syslog.netty.Rfc5425FrameDecoder;
 import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.test.infra.common.TestUtils;
 import org.junit.jupiter.api.extension.AfterAllCallback;
 import org.junit.jupiter.api.extension.BeforeAllCallback;
 import org.junit.jupiter.api.extension.ExtensionContext;
 
 public class SyslogService implements BeforeAllCallback, AfterAllCallback {
-    private static final CamelContext CAMEL_CONTEXT = new DefaultCamelContext();
+    private final CamelContext camelContext = new DefaultCamelContext();
 
-    private static String protocol;
-    private static String host;
-    private static int port;
+    private final RouteConfigurator routeConfigurator;
 
-    public SyslogService(String protocol, String host, int port) {
-        this.protocol = protocol;
-        this.host = host;
-        this.port = port;
+    public SyslogService(RouteConfigurator routeConfigurator) {
+        this.routeConfigurator = routeConfigurator;
     }
 
     @Override
     public void beforeAll(ExtensionContext context) throws Exception {
-        CAMEL_CONTEXT.getRegistry().bind("decoder", new Rfc5425FrameDecoder());
-        CAMEL_CONTEXT.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() {
-                from("netty:" + protocol + ":" + host + ":" + port + "?sync=false&decoders=#decoder").unmarshal(new SyslogDataFormat()).to("seda:syslog");
-            }
-        });
-        CAMEL_CONTEXT.start();
+        routeConfigurator.configure(camelContext);
+
+        camelContext.start();
+        TestUtils.waitFor(camelContext::isStarted);
     }
 
     @Override
     public void afterAll(ExtensionContext context) {
-        CAMEL_CONTEXT.stop();
+        camelContext.stop();
+        TestUtils.waitFor(camelContext::isStopped);
+    }
+
+    public CamelContext getCamelContext() {
+        return camelContext;
     }
 
     public Exchange getFirstExchangeToBeReceived() {
-        return CAMEL_CONTEXT.createConsumerTemplate().receive("seda:syslog", 10000L);
+        return camelContext.createConsumerTemplate().receive("seda:syslog", 10000L);
+    }
+
+    public static SyslogService sinkSyslogServiceFactory(String protocol, String host, int port) {
+        SinkRouteConfigurator sinkRouteConfigurator = new SinkRouteConfigurator(protocol, host, port);
+
+        return new SyslogService(sinkRouteConfigurator);
+    }
+
+    public static SyslogService sourceSyslogServiceFactory(String protocol, String host, int port) {
+        SourceRouteConfigurator sourceRouteConfigurator = new SourceRouteConfigurator(protocol, host, port);
+
+        return new SyslogService(sourceRouteConfigurator);
     }
 }
diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
index 78eb2f4..a2620dc 100644
--- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
@@ -19,6 +19,8 @@ package org.apache.camel.kafkaconnector.syslog.sink;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
@@ -31,6 +33,7 @@ import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.fail;
 
 
@@ -40,11 +43,13 @@ import static org.junit.jupiter.api.Assertions.fail;
  */
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CamelSinkSyslogITCase extends CamelSinkTestSupport {
-    private static final int FREE_PORT = NetworkUtils.getFreePort("localhost", NetworkUtils.Protocol.UDP);
+    private static final String HOST = "localhost";
+    private static final String PROTOCOL = "udp";
+    private static final int FREE_PORT = NetworkUtils.getFreePort(HOST, NetworkUtils.Protocol.UDP);
     private static final String TEST_TXT = "<13>1 2020-05-14T14:47:01.198+02:00 nathannever myapp - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!";
 
     @RegisterExtension
-    public static SyslogService syslogService = new SyslogService("udp", "//localhost", FREE_PORT);
+    public static SyslogService service = SyslogService.sinkSyslogServiceFactory(PROTOCOL, HOST, FREE_PORT);
 
     private String topicName;
     private final int expect = 1;
@@ -79,7 +84,15 @@ public class CamelSinkSyslogITCase extends CamelSinkTestSupport {
     @Override
     protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
         if (latch.await(30, TimeUnit.SECONDS)) {
-            assertEquals(TEST_TXT, syslogService.getFirstExchangeToBeReceived().getIn().getBody(String.class));
+            Exchange exchange = service.getFirstExchangeToBeReceived();
+            assertNotNull(exchange, "There should have been an exchange received");
+            Message message = exchange.getIn();
+            assertNotNull(message, "There should have been a message in the exchange");
+
+            String body = message.getBody(String.class);
+            assertNotNull(body, "The message body should not be null");
+            assertEquals(TEST_TXT, message.getBody(String.class),
+                    "The received message body does not match the expected message");
         } else {
             fail("Timed out wait for data to be added to the Kafka cluster");
         }
diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java
index 7f14a2e..3a14ac2 100644
--- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java
@@ -19,26 +19,18 @@ package org.apache.camel.kafkaconnector.syslog.source;
 
 import java.util.concurrent.ExecutionException;
 
-import org.apache.camel.CamelContext;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.syslog.SyslogDataFormat;
-import org.apache.camel.component.syslog.netty.Rfc5425Encoder;
-import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
 import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
 import org.apache.camel.kafkaconnector.common.test.StringMessageConsumer;
 import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeAll;
+import org.apache.camel.kafkaconnector.syslog.services.SyslogService;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -49,57 +41,32 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
  */
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CamelSourceSyslogITCase extends CamelSourceTestSupport {
-    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceSyslogITCase.class);
     private static final String HOST = "localhost";
     private static final String PROTOCOL = "udp";
     private static final int FREE_PORT = NetworkUtils.getFreePort(HOST, NetworkUtils.Protocol.UDP);
 
+    @RegisterExtension
+    public static SyslogService service = SyslogService.sourceSyslogServiceFactory(PROTOCOL, "localhost", FREE_PORT);
+
     private final int expect = 1;
-    private ConnectorPropertyFactory connectorPropertyFactory;
     private String topicName;
 
-    private CamelContext camelContext;
-
     @Override
     protected String[] getConnectorsInTest() {
         return new String[] {"camel-syslog-kafka-connector"};
     }
 
-    @BeforeAll
-    public void setupCamelContext() throws Exception {
-        LOG.debug("Creating the Camel context");
-        camelContext = new DefaultCamelContext();
-        camelContext.getRegistry().bind("encoder", new Rfc5425Encoder());
-
-        LOG.debug("Adding routes");
-        camelContext.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() {
-                from("direct:test")
-                        .marshal(new SyslogDataFormat())
-                        .toF("netty:%s://%s:%d?sync=false&encoders=#encoder&useByteBuf=true", PROTOCOL, HOST, FREE_PORT);
-            }
-        });
-    }
-
     @BeforeEach
     public void setUp() {
         topicName = getTopicForTest(this);
-
-        camelContext.start();
-        TestUtils.waitFor(camelContext::isStarted);
     }
 
-    @AfterEach
-    public void tearDown() {
-        camelContext.stop();
-    }
 
     @Override
     protected void produceTestData() {
         String message = "<13>1 2020-05-14T14:47:01.198+02:00 nathannever myapp - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!";
 
-        camelContext.createProducerTemplate().sendBody("direct:test", message);
+        service.getCamelContext().createProducerTemplate().sendBody("direct:test", message);
     }
 
     @Override
@@ -113,7 +80,7 @@ public class CamelSourceSyslogITCase extends CamelSourceTestSupport {
     @RepeatedTest(3)
     @Timeout(90)
     public void testBasicSend() throws ExecutionException, InterruptedException {
-        connectorPropertyFactory = CamelSyslogPropertyFactory
+        ConnectorPropertyFactory connectorPropertyFactory = CamelSyslogPropertyFactory
                 .basic()
                 .withKafkaTopic(topicName)
                 .withHost(HOST)


[camel-kafka-connector] 10/10: Avoid spawning Jetty servers too quickly as they seem to cause GH actions to fail

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 56ddf9e1e9c272199d002e38e041aee3c0be9ee2
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Feb 15 11:03:37 2021 +0100

    Avoid spawning Jetty servers too quickly as they seem to cause GH actions to fail
---
 .../syslog/source/CamelSourceSyslogITCase.java        | 19 ++++++++++++++++++-
 1 file changed, 18 insertions(+), 1 deletion(-)

diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java
index 3a14ac2..ab80914 100644
--- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java
@@ -25,11 +25,15 @@ import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
 import org.apache.camel.kafkaconnector.common.test.StringMessageConsumer;
 import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.kafkaconnector.syslog.services.SyslogService;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -77,11 +81,16 @@ public class CamelSourceSyslogITCase extends CamelSourceTestSupport {
     }
 
 
-    @RepeatedTest(3)
+
+
     @Timeout(90)
+    @Test
+    @DisabledIfSystemProperty(named = "enable.flaky.tests", matches = "true",
+            disabledReason = "Already executed with testBasicSendStress")
     public void testBasicSend() throws ExecutionException, InterruptedException {
         ConnectorPropertyFactory connectorPropertyFactory = CamelSyslogPropertyFactory
                 .basic()
+                .withName("CamelSyslogSourceConnector" + TestUtils.randomWithRange(0, 1000))
                 .withKafkaTopic(topicName)
                 .withHost(HOST)
                 .withPort(FREE_PORT)
@@ -92,4 +101,12 @@ public class CamelSourceSyslogITCase extends CamelSourceTestSupport {
 
         runTestBlocking(connectorPropertyFactory, stringMessageConsumer);
     }
+
+    @RepeatedTest(3)
+    @Timeout(90)
+    @EnabledIfSystemProperty(named = "enable.flaky.tests", matches = "true",
+            disabledReason = "Quickly spawning multiple Jetty Servers doesn't work well on Github Actions")
+    public void testBasicSendStress() throws ExecutionException, InterruptedException {
+        testBasicSend();
+    }
 }


[camel-kafka-connector] 03/10: Give another run on test failures to rule out false negatives due to slow CI environments

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit abc05f2217549dad00a217d6bc1a93e09890f010
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Fri Feb 12 15:16:22 2021 +0100

    Give another run on test failures to rule out false negatives due to slow CI environments
---
 .github/workflows/ci-build.yml        | 1 +
 .github/workflows/daily-java-next.yml | 1 +
 Jenkinsfile.jdk11                     | 2 +-
 3 files changed, 3 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/ci-build.yml b/.github/workflows/ci-build.yml
index 27ad769..094839c 100644
--- a/.github/workflows/ci-build.yml
+++ b/.github/workflows/ci-build.yml
@@ -72,6 +72,7 @@ jobs:
       - name: build and itests
         run: |
           ./mvnw ${MAVEN_ARGS} \
+            -Dfailsafe.rerunFailingTestsCount=2 \
             -Dcheckstyle.failOnViolation=true \
             -Psourcecheck \
             -DskipIntegrationTests=false \
diff --git a/.github/workflows/daily-java-next.yml b/.github/workflows/daily-java-next.yml
index 3ee63bf..e65f126 100644
--- a/.github/workflows/daily-java-next.yml
+++ b/.github/workflows/daily-java-next.yml
@@ -69,6 +69,7 @@ jobs:
       - name: build and itests
         run: |
           ./mvnw ${MAVEN_ARGS} \
+            -Dfailsafe.rerunFailingTestsCount=2 \
             -Dcheckstyle.failOnViolation=true \
             -Psourcecheck \
             -DskipIntegrationTests=false \
diff --git a/Jenkinsfile.jdk11 b/Jenkinsfile.jdk11
index 7011e19..d9312ad 100644
--- a/Jenkinsfile.jdk11
+++ b/Jenkinsfile.jdk11
@@ -66,7 +66,7 @@ pipeline {
 
         stage('Test') {
             steps {
-                sh "./mvnw $MAVEN_PARAMS -DskipIntegrationTests=false -Denable.slow.tests=true -Dmaven.test.failure.ignore=true clean install"
+                sh "./mvnw $MAVEN_PARAMS -DskipIntegrationTests=false -Denable.slow.tests=true -Dmaven.test.failure.ignore=true -Dfailsafe.rerunFailingTestsCount=2 clean install"
             }
             post {
                 always {


[camel-kafka-connector] 02/10: Avoid blocking connector initialization on SSH tests as it leads to failures on GH actions

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 95963aa92518fdec869dd471572b28cda1b6c17c
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Fri Feb 12 16:08:21 2021 +0100

    Avoid blocking connector initialization on SSH tests as it leads to failures on GH actions
---
 .../common/test/CamelSinkTestSupport.java          | 25 ++++++++++++++++++++++
 .../ssh/sink/CamelSinkSshITCase.java               |  2 +-
 2 files changed, 26 insertions(+), 1 deletion(-)

diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
index b414726..ec9e9dc 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
@@ -97,6 +97,31 @@ public abstract class CamelSinkTestSupport extends AbstractKafkaTest {
         verifyMessages(latch);
     }
 
+    /**
+     * A simple test runner that follows the steps: initialize, start consumer, produce messages, verify results
+     *
+     * @param connectorPropertyFactory A factory for connector properties
+     * @throws Exception For test-specific exceptions
+     */
+    protected void runTestNonBlocking(ConnectorPropertyFactory connectorPropertyFactory, TestMessageProducer producer) throws Exception {
+        connectorPropertyFactory.log();
+        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
+
+        LOG.debug("Creating the consumer ...");
+        ExecutorService service = Executors.newCachedThreadPool();
+
+        CountDownLatch latch = new CountDownLatch(1);
+        service.submit(() -> consumeMessages(latch));
+
+        producer.produceMessages();
+
+        LOG.debug("Waiting for the messages to be processed");
+        service.shutdown();
+
+        LOG.debug("Waiting for the test to complete");
+        verifyMessages(latch);
+    }
+
 
     protected boolean waitForData() {
         try {
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
index 02f6f21..abfdccb 100644
--- a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
@@ -94,6 +94,6 @@ public class CamelSinkSshITCase extends CamelSinkTestSupport {
                 .withUsername("root")
                 .withPassword("root");
 
-        runTest(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topic, expect));
+        runTestNonBlocking(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topic, expect));
     }
 }


[camel-kafka-connector] 06/10: Cleanup the check state logic on the KafkaConnectEmbeddedService

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 09df84fe919a3e87ffad97b2be55fcbda5875150
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Fri Feb 12 16:05:44 2021 +0100

    Cleanup the check state logic on the KafkaConnectEmbeddedService
---
 .../common/services/kafkaconnect/KafkaConnectEmbedded.java    | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)

diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java
index bc6b868..52af0a5 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java
@@ -64,6 +64,12 @@ public class KafkaConnectEmbedded implements KafkaConnectService {
         LOG.trace("Added the new connector");
     }
 
+    private boolean doCheckState(ConnectorStateInfo connectorStateInfo, Integer expectedTaskNumber) {
+        return connectorStateInfo.tasks().size() >= expectedTaskNumber
+                && connectorStateInfo.connector().state().equals(AbstractStatus.State.RUNNING.toString())
+                && connectorStateInfo.tasks().stream().allMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString()));
+    }
+
     @Override
     public void initializeConnectorBlocking(ConnectorPropertyFactory propertyFactory, Integer expectedTaskNumber) throws InterruptedException {
         initializeConnector(propertyFactory);
@@ -73,9 +79,8 @@ public class KafkaConnectEmbedded implements KafkaConnectService {
                 connectorStateInfo = cluster.connectorStatus(connectorName);
                 Thread.sleep(20L);
             } while (connectorStateInfo == null);
-            return  connectorStateInfo.tasks().size() >= expectedTaskNumber
-                    && connectorStateInfo.connector().state().equals(AbstractStatus.State.RUNNING.toString())
-                    && connectorStateInfo.tasks().stream().allMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString()));
+
+            return doCheckState(connectorStateInfo, expectedTaskNumber);
         }, 30000L, "The connector " + connectorName + " did not start within a reasonable time");
     }
 


[camel-kafka-connector] 07/10: Set the port configuration to avoid the Jetty port error when creating the connect instance

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit d3adf240577c5633cd7683c6f6c460368a241128
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Feb 15 09:14:17 2021 +0100

    Set the port configuration to avoid the Jetty port error when creating the connect instance
---
 .../kafkaconnector/common/services/kafka/EmbeddedKafkaService.java      | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/EmbeddedKafkaService.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/EmbeddedKafkaService.java
index 3ce25d3..a7e50bd 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/EmbeddedKafkaService.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/EmbeddedKafkaService.java
@@ -50,6 +50,8 @@ public class EmbeddedKafkaService implements KafkaService {
 
         Map<String, String> workerProps = new HashMap<>();
         workerProps.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
+        workerProps.put(WorkerConfig.LISTENERS_CONFIG, "http://localhost:9999");
+
 
         String pluginPaths = PluginPathHelper.getInstance().pluginPaths();
 


[camel-kafka-connector] 09/10: Delete the connector before deleting the test topics

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 2b0bda21a0d39e124aed346c8843fc402707a7e9
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Feb 15 10:49:49 2021 +0100

    Delete the connector before deleting the test topics
---
 .../common/services/kafkaconnect/KafkaConnectEmbedded.java          | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java
index 52af0a5..9d3d833 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java
@@ -88,13 +88,13 @@ public class KafkaConnectEmbedded implements KafkaConnectService {
     public void stop() {
         if (connectorName != null) {
             try {
+                LOG.info("Removing connector {}", connectorName);
+                cluster.deleteConnector(connectorName);
+
                 LOG.info("Removing topics used during the test");
                 Admin client = cluster.kafka().createAdminClient();
 
                 client.deleteTopics(cluster.connectorTopics(connectorName).topics());
-
-                LOG.info("Removing connector {}", connectorName);
-                cluster.deleteConnector(connectorName);
             } finally {
                 connectorName = null;
             }


[camel-kafka-connector] 05/10: #873 initial cxf Source/Sink connectors test (#940)

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 3b6f14b3ce401a7a0353c60df620bfe5ebc20300
Author: Freeman(Yue) Fang <fr...@gmail.com>
AuthorDate: Tue Feb 9 10:34:40 2021 -0500

    #873 initial cxf Source/Sink connectors test (#940)
    
    * #873 initial cxf Source/Sink connectors test
    
    * revise according to feedback
---
 tests/itests-cxf/pom.xml                           | 100 +++++++++++
 .../cxf/sink/CamelSinkCXFITCase.java               | 189 +++++++++++++++++++++
 .../cxf/sink/CamelSinkCXFPropertyFactory.java      |  58 +++++++
 .../camel/kafkaconnector/cxf/sink/GreeterImpl.java |  30 ++++
 .../kafkaconnector/cxf/sink/HelloServiceImpl.java  |  81 +++++++++
 .../cxf/source/CamelSourceCXFITCase.java           | 181 ++++++++++++++++++++
 .../cxf/source/CamelSourceCXFPropertyFactory.java  |  64 +++++++
 .../kafkaconnector/cxf/source/HelloService.java    |  35 ++++
 8 files changed, 738 insertions(+)

diff --git a/tests/itests-cxf/pom.xml b/tests/itests-cxf/pom.xml
new file mode 100644
index 0000000..88ea1fa
--- /dev/null
+++ b/tests/itests-cxf/pom.xml
@@ -0,0 +1,100 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.camel.kafkaconnector</groupId>
+        <artifactId>itests-parent</artifactId>
+        <version>0.8.0-SNAPSHOT</version>
+        <relativePath>../itests-parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>itests-cxf</artifactId>
+    <name>Camel-Kafka-Connector :: Tests :: CXF</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.kafkaconnector</groupId>
+            <artifactId>itests-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test-infra-common</artifactId>
+            <version>${camel.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test-infra-dispatch-router</artifactId>
+            <version>${camel.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-cxf</artifactId>
+        </dependency>
+        
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-transports-http-jetty</artifactId>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.eclipse.jetty</groupId>
+                    <artifactId>jetty-server</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.eclipse.jetty</groupId>
+                    <artifactId>jetty-util</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.eclipse.jetty</groupId>
+                    <artifactId>jetty-io</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.eclipse.jetty</groupId>
+                    <artifactId>jetty-security</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.eclipse.jetty</groupId>
+                    <artifactId>jetty-continuation</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.eclipse.jetty</groupId>
+                    <artifactId>jetty-http</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-testutils</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+
+</project>
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java
new file mode 100644
index 0000000..61c01c1
--- /dev/null
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.cxf.sink;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
+
+import javax.xml.ws.Endpoint;
+
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.cxf.source.HelloService;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.endpoint.Server;
+import org.apache.cxf.ext.logging.LoggingInInterceptor;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.frontend.ServerFactoryBean;
+import org.apache.cxf.jaxws.EndpointImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class CamelSinkCXFITCase extends AbstractKafkaTest {
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkCXFITCase.class);
+      
+
+    private final int expect = 10;
+    
+    private final int simplePort = NetworkUtils.getFreePort("localhost");
+    private final int jaxwsPort = NetworkUtils.getFreePort("localhost");
+
+    protected static final String ECHO_OPERATION = "echo";
+    protected static final String GREET_ME_OPERATION = "greetMe";
+    protected static final String TEST_MESSAGE = "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">"
+        + "<soap:Body><ns1:echo xmlns:ns1=\"http://source.cxf.kafkaconnector.camel.apache.org/\">"
+        + "<arg0 xmlns=\"http://source.cxf.kafkaconnector.camel.apache.org/\">hello world</arg0>"
+        + "</ns1:echo></soap:Body></soap:Envelope>";
+    protected static final String JAXWS_TEST_MESSAGE = "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">\"\n" + 
+        "        + \"<soap:Body><ns1:greetMe xmlns:ns1=\"http://apache.org/hello_world_soap_http/types\">\"\n" + 
+        "        + \"<requestType xmlns=\"http://apache.org/hello_world_soap_http/types\">hello world!</requestType>\"\n" + 
+        "        + \"</ns1:greetMe></soap:Body></soap:Envelope>";
+
+    protected Server server;
+    protected EndpointImpl endpoint;
+    
+    
+
+    
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-cxf-kafka-connector"};
+    }
+
+    protected String getSimpleServerAddress() {
+        return "http://localhost:" + simplePort + "/" + getClass().getSimpleName() + "/simpletest";
+    }
+
+    protected String getJaxWsServerAddress() {
+        return "http://localhost:" + jaxwsPort + "/" + getClass().getSimpleName() + "/jaxwstest";
+    }
+
+    
+    @BeforeEach
+    public void setUp() throws IOException {
+        // start a simple front service
+        ServerFactoryBean svrBean = new ServerFactoryBean();
+        svrBean.setAddress(getSimpleServerAddress());
+        svrBean.setServiceClass(HelloService.class);
+        svrBean.setServiceBean(new HelloServiceImpl());
+        svrBean.setBus(BusFactory.getDefaultBus());
+        server = svrBean.create();
+        server.getEndpoint().getInInterceptors().add(new LoggingInInterceptor());
+        server.getEndpoint().getOutInterceptors().add(new LoggingOutInterceptor());
+        // start a jaxws front service
+        GreeterImpl greeterImpl = new GreeterImpl();
+        endpoint = (EndpointImpl)Endpoint.publish(getJaxWsServerAddress(), greeterImpl);
+        endpoint.getInInterceptors().add(new LoggingInInterceptor());
+        endpoint.getOutInterceptors().add(new LoggingOutInterceptor());
+    }
+
+    @AfterEach
+    public void tearDown() {
+        endpoint.stop();
+        server.stop();
+        server.destroy();
+    }
+
+
+    private void putRecords(String message) {
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+
+        for (int i = 0; i < expect; i++) {
+            try {
+                kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), message);
+            } catch (ExecutionException e) {
+                LOG.error("Unable to produce messages: {}", e.getMessage(), e);
+            } catch (InterruptedException e) {
+                break;
+            } 
+        }
+    }
+
+    public void runTest(ConnectorPropertyFactory connectorPropertyFactory, String message) throws ExecutionException, InterruptedException, TimeoutException {
+        connectorPropertyFactory.log();
+        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
+        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+        ExecutorService service = Executors.newCachedThreadPool();
+        Runnable r = () -> this.putRecords(message);
+        service.submit(r);
+        Thread.sleep(5000);
+        LOG.debug("Created the consumer ... About to receive messages");
+                
+    }
+
+    @Test
+    @Timeout(90)
+    public void testBasicSendReceiveUsingUrl() {
+        try {
+            
+
+            ConnectorPropertyFactory connectorPropertyFactory = CamelSinkCXFPropertyFactory.basic()
+                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                    .withAddress(getSimpleServerAddress())
+                    .withServiceClass("org.apache.camel.kafkaconnector.cxf.source.HelloService")
+                    .withDataFormat("RAW");
+
+            runTest(connectorPropertyFactory, TEST_MESSAGE);
+        } catch (Exception e) {
+            LOG.error("CXF Sink test failed: {} {}", e.getMessage(), e);
+            fail(e.getMessage(), e);
+        }
+    }
+
+    @Test
+    @Timeout(90)
+    public void testJaxWsBasicSendReceiveUsingUrl() {
+        try {
+            
+
+            ConnectorPropertyFactory connectorPropertyFactory = CamelSinkCXFPropertyFactory.basic()
+                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                    .withAddress(this.getJaxwsEndpointUri())
+                    .withDataFormat("RAW");
+
+            runTest(connectorPropertyFactory, JAXWS_TEST_MESSAGE);
+        } catch (Exception e) {
+            LOG.error("CXF Sink test failed: {} {}", e.getMessage(), e);
+            fail(e.getMessage(), e);
+        }
+    }
+    
+    protected String getSimpleEndpointUri() {
+        return getSimpleServerAddress()
+               + "?serviceClass=org.apache.camel.kafkaconnector.cxf.source.HelloService";
+    }
+
+    protected String getJaxwsEndpointUri() {
+        return getJaxWsServerAddress() + "?serviceClass=org.apache.hello_world_soap_http.Greeter";
+    }
+
+    
+}
\ No newline at end of file
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFPropertyFactory.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFPropertyFactory.java
new file mode 100644
index 0000000..e7ed6a7
--- /dev/null
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFPropertyFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.cxf.sink;
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+
+
+final class CamelSinkCXFPropertyFactory extends SinkConnectorPropertyFactory<CamelSinkCXFPropertyFactory> {
+    private CamelSinkCXFPropertyFactory() {
+
+    }
+
+    
+
+    public EndpointUrlBuilder<CamelSinkCXFPropertyFactory> withUrl(String serviceUrl) {
+        String url = String.format("cxf://%s", serviceUrl);
+
+        return new EndpointUrlBuilder<>(this::withSinkUrl, url);
+    }
+    
+    public CamelSinkCXFPropertyFactory withDataFormat(String dataFormat) {
+        return setProperty("camel.sink.endpoint.dataFormat", dataFormat);
+    }
+    
+    public CamelSinkCXFPropertyFactory withAddress(String address) {
+        return setProperty("camel.sink.path.address", address);
+    }
+    
+    public CamelSinkCXFPropertyFactory withServiceClass(String serviceClass) {
+        return setProperty("camel.sink.endpoint.serviceClass", serviceClass);
+    }
+
+    public static CamelSinkCXFPropertyFactory basic() {
+        return new CamelSinkCXFPropertyFactory()
+                .withTasksMax(1)
+                .withName("CamelCXFSinkConnector")
+                .withConnectorClass("org.apache.camel.kafkaconnector.cxf.CamelCxfSinkConnector")
+                .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+    }
+}
+
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
new file mode 100644
index 0000000..a5b909d
--- /dev/null
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.cxf.sink;
+
+import java.util.logging.Logger;
+
+
+public class GreeterImpl extends org.apache.hello_world_soap_http.GreeterImpl {
+    
+    private static final Logger LOG = Logger.getLogger(GreeterImpl.class.getName());
+    public String greetMe(String hi) {
+        LOG.info("jaxws greetMe " + hi);
+        return "Greet " + hi;
+    }
+}
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/HelloServiceImpl.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/HelloServiceImpl.java
new file mode 100644
index 0000000..42f12f5
--- /dev/null
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/HelloServiceImpl.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.cxf.sink;
+
+import java.util.List;
+
+import org.apache.camel.kafkaconnector.cxf.source.HelloService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HelloServiceImpl implements HelloService {
+    private static final Logger LOG = LoggerFactory.getLogger(HelloServiceImpl.class);
+    public static int invocationCount = 0;
+
+    private String name;
+
+    public HelloServiceImpl(String name) {
+        this.name = name;
+    }
+
+    public HelloServiceImpl() {
+        name = "";
+    }
+
+    @Override
+    public String echo(String text) {
+        LOG.info("call for echo with " + text);
+        invocationCount++;
+        LOG.info("invocationCount is " + invocationCount);
+        return "echo " + text;
+    }
+
+    @Override
+    public void ping() {
+        invocationCount++;
+        LOG.info("call for oneway ping");
+    }
+
+    @Override
+    public int getInvocationCount() {
+        return invocationCount;
+    }
+
+    @Override
+    public String sayHello() {
+        
+        return "hello" + name;
+    }
+
+    @Override
+    public Boolean echoBoolean(Boolean bool) {
+        LOG.info("call for echoBoolean with " + bool);
+        invocationCount++;
+        LOG.info("invocationCount is " + invocationCount);
+        return bool;
+    }
+
+    @Override
+    public String complexParameters(List<String> par1, List<String> par2) {
+        String result = "param";
+        if (par1 != null && par2 != null) {
+            result = result + ":" + par1.get(0) + par2.get(0);
+        }
+        return result;
+    }
+
+}
\ No newline at end of file
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java
new file mode 100644
index 0000000..4ddf9e8
--- /dev/null
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.cxf.source;
+
+import java.util.concurrent.ExecutionException;
+
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.ext.logging.LoggingInInterceptor;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.frontend.ClientFactoryBean;
+import org.apache.cxf.frontend.ClientProxyFactoryBean;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+
+/**
+ * A simple test case that checks whether the CXF Consumer Endpoint produces the expected number of
+ * messages
+ */
+public class CamelSourceCXFITCase extends AbstractKafkaTest {
+    
+    protected static final int PORT = NetworkUtils.getFreePort("localhost");
+    protected static final String SIMPLE_ENDPOINT_ADDRESS = "http://localhost:" + PORT
+        + "/CxfConsumerTest/test";
+    protected static final String SIMPLE_ENDPOINT_URI =  SIMPLE_ENDPOINT_ADDRESS
+        + "?serviceClass=org.apache.camel.kafkaconnector.cxf.source.HelloService"
+        + "&publishedEndpointUrl=http://www.simple.com/services/test";
+
+    
+    private static final String TEST_MESSAGE = "Hello World!";
+    
+
+    
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceCXFITCase.class);
+
+    private int received;
+    private final int expect = 1;
+    
+
+    
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-cxf-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        received = 0;
+        
+    }
+
+    private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
+        LOG.debug("Received: {}", record.value());
+        
+        received++;
+
+        if (received == expect) {
+            return false;
+        }
+
+        return true;
+    }
+
+
+
+    public void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
+        connectorPropertyFactory.log();
+        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
+        Thread.sleep(5000);//ensure cxf source connector is up
+        ClientProxyFactoryBean proxyFactory = new ClientProxyFactoryBean();
+        ClientFactoryBean clientBean = proxyFactory.getClientFactoryBean();
+        clientBean.setAddress(SIMPLE_ENDPOINT_ADDRESS);
+        clientBean.setServiceClass(HelloService.class);
+        Bus bus = BusFactory.newInstance().createBus();
+        clientBean.setBus(bus);
+        bus.getInInterceptors().add(new LoggingInInterceptor());
+        bus.getOutInterceptors().add(new LoggingOutInterceptor());
+        HelloService client = (HelloService) proxyFactory.create();
+        try {
+            String result = client.echo(TEST_MESSAGE);
+            assertEquals(result, TEST_MESSAGE);
+        } catch (Exception e) {
+            LOG.info("Test Invocation Failure", e);
+        }
+        
+        
+        LOG.debug("Creating the consumer ...");
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
+        LOG.debug("Created the consumer ...");
+
+        assertEquals(received, expect, "Didn't process the expected amount of messages");
+    }
+
+    
+
+    @Test
+    @Timeout(20)
+    public void testBasicSendReceive() {
+        try {
+            ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory
+                    .basic()
+                    .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                    .withAddress(SIMPLE_ENDPOINT_ADDRESS)
+                    .withServiceClass("org.apache.camel.kafkaconnector.cxf.source.HelloService");
+                                        
+
+            runBasicStringTest(connectorPropertyFactory);
+        } catch (Exception e) {
+            LOG.error("CXF test failed: {}", e.getMessage(), e);
+            fail(e.getMessage());
+        }
+    }
+    
+    @Test
+    @Timeout(20)
+    public void testBasicSendReceiveUsingUrl() {
+        try {
+            ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory
+                    .basic()
+                    .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                    .withUrl(SIMPLE_ENDPOINT_URI).buildUrl();
+                    
+
+            runBasicStringTest(connectorPropertyFactory);
+        } catch (Exception e) {
+            LOG.error("CXF test failed: {}", e.getMessage(), e);
+            fail(e.getMessage());
+        }
+    }
+
+    
+    @Test
+    @Timeout(20)
+    public void testBasicSendReceiveUsingDataFormat() {
+        try {
+            ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory
+                .basic()
+                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withAddress(SIMPLE_ENDPOINT_ADDRESS)
+                .withServiceClass("org.apache.camel.kafkaconnector.cxf.source.HelloService")
+                .withDataFormat("POJO");
+                    
+
+            runBasicStringTest(connectorPropertyFactory);
+        } catch (Exception e) {
+            LOG.error("CXF test failed: {}", e.getMessage(), e);
+            fail(e.getMessage());
+        }
+    }
+
+    
+}
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFPropertyFactory.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFPropertyFactory.java
new file mode 100644
index 0000000..7d054e5
--- /dev/null
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFPropertyFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.cxf.source;
+
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
+
+
+/**
+ * Creates the set of properties used by a Camel CXF Source Connector
+ */
+final class CamelSourceCXFPropertyFactory extends SourceConnectorPropertyFactory<CamelSourceCXFPropertyFactory> {
+    private CamelSourceCXFPropertyFactory() {
+
+    }
+
+    public CamelSourceCXFPropertyFactory withAddress(String address) {
+        return setProperty("camel.source.path.address", address);
+    }
+    
+    public CamelSourceCXFPropertyFactory withServiceClass(String serviceClass) {
+        return setProperty("camel.source.endpoint.serviceClass", serviceClass);
+    }
+    
+    public CamelSourceCXFPropertyFactory withPublishedEndpointUrl(String publishedEndpointUrl) {
+        return setProperty("camel.source.endpoint.publishedEndpointUrl", publishedEndpointUrl);
+    }
+    
+    public CamelSourceCXFPropertyFactory withDataFormat(String dataFormat) {
+        return setProperty("camel.source.endpoint.dataFormat", dataFormat);
+    }
+        
+    public static CamelSourceCXFPropertyFactory basic() {
+        return new CamelSourceCXFPropertyFactory()
+                .withName("CamelCXFSourceConnector")
+                .withTasksMax(1)
+                .withConnectorClass("org.apache.camel.kafkaconnector.cxf.CamelCxfSourceConnector")
+                .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+    }
+
+    public EndpointUrlBuilder<CamelSourceCXFPropertyFactory> withUrl(String cxfUrl) {
+        String url = String.format("cxf://%s", cxfUrl);
+        return new EndpointUrlBuilder<>(this::withSourceUrl, url);
+    }
+
+    
+}
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/HelloService.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/HelloService.java
new file mode 100644
index 0000000..5c4653f
--- /dev/null
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/HelloService.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.cxf.source;
+
+import java.util.List;
+
+public interface HelloService {
+    String sayHello();
+
+    void ping();
+
+    int getInvocationCount();
+
+    String echo(String text) throws Exception;
+
+    Boolean echoBoolean(Boolean bool);
+
+    String complexParameters(List<String> par1, List<String> par2);
+
+}


[camel-kafka-connector] 01/10: Avoid creating everything from scratch for every Cassandra sink test

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 9a156c35855c9e3a39d0ea6c52aa1e6c1556b280
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Tue Feb 9 17:24:55 2021 +0100

    Avoid creating everything from scratch for every Cassandra sink test
---
 .../cassandra/sink/CamelSinkCassandraITCase.java            | 13 +++++++++----
 1 file changed, 9 insertions(+), 4 deletions(-)

diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
index b88946e..47e2593 100644
--- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
+++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
@@ -28,6 +28,7 @@ import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.cassandra.services.CassandraService;
 import org.apache.camel.test.infra.cassandra.services.CassandraServiceFactory;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
@@ -58,18 +59,22 @@ public class CamelSinkCassandraITCase extends CamelSinkTestSupport {
         return new String[] {"camel-cql-kafka-connector"};
     }
 
-    @BeforeEach
-    public void setUp() {
-        topicName = getTopicForTest(this);
+    @BeforeAll
+    public void setUpTestData() {
         cassandraClient = new CassandraClient(cassandraService.getCassandraHost(), cassandraService.getCQL3Port());
 
         testDataDao = cassandraClient.newTestDataDao();
 
         testDataDao.createKeySpace();
         testDataDao.useKeySpace();
-        testDataDao.createTable();
+    }
 
+    @BeforeEach
+    public void setUp() {
+        topicName = getTopicForTest(this);
         received = 0;
+
+        testDataDao.createTable();
     }
 
     @AfterEach