You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/19 10:51:09 UTC
[camel-kafka-connector] branch camel-master updated: Refactor the
CXF servers into a service for simpler reuse
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/camel-master by this push:
new 4870fd5 Refactor the CXF servers into a service for simpler reuse
4870fd5 is described below
commit 4870fd543337637a1f7fcc1d1c0aacdea741dfbe
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Thu Feb 18 10:53:48 2021 +0100
Refactor the CXF servers into a service for simpler reuse
---
.../GreeterImpl.java => common/CXFProperties.java} | 13 +--
.../cxf/services/CXFEmbeddedServerService.java | 107 ++++++++++++++++++
.../GreeterImpl.java => services/CXFService.java} | 26 +++--
.../JaxWsServiceConfigurator.java} | 15 +--
.../ServerFactoryBeanConfigurator.java} | 15 +--
.../cxf/sink/CamelSinkCXFITCase.java | 119 ++++++++-------------
.../cxf/sink/CamelSinkCXFPropertyFactory.java | 3 +-
.../camel/kafkaconnector/cxf/sink/GreeterImpl.java | 20 +++-
...Impl.java => SinkJaxWsServiceConfigurator.java} | 16 +--
...java => SinkServerFactoryBeanConfigurator.java} | 19 ++--
10 files changed, 223 insertions(+), 130 deletions(-)
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/common/CXFProperties.java
similarity index 69%
copy from tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
copy to tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/common/CXFProperties.java
index 9981604..fbd581a 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/common/CXFProperties.java
@@ -15,16 +15,13 @@
* limitations under the License.
*/
-package org.apache.camel.kafkaconnector.cxf.sink;
+package org.apache.camel.kafkaconnector.cxf.common;
-import java.util.logging.Logger;
+public final class CXFProperties {
+ public static final String SIMPLE_SERVER_ADDRESS = "cxf.simple.server.address";
+ public static final String JAXWS_SERVER_ADDRESS = "cxf.jaxws.server.address";
-public class GreeterImpl extends org.apache.hello_world_soap_http.GreeterImpl {
+ private CXFProperties() {
- private static final Logger LOG = Logger.getLogger(GreeterImpl.class.getName());
-
- public String greetMe(String hi) {
- LOG.info("jaxws greetMe " + hi);
- return "Greet " + hi;
}
}
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/services/CXFEmbeddedServerService.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/services/CXFEmbeddedServerService.java
new file mode 100644
index 0000000..6a1026b
--- /dev/null
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/services/CXFEmbeddedServerService.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.cxf.services;
+
+import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.cxf.common.CXFProperties;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.endpoint.Server;
+import org.apache.cxf.ext.logging.LoggingInInterceptor;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.frontend.ServerFactoryBean;
+import org.apache.cxf.jaxws.EndpointImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CXFEmbeddedServerService implements CXFService {
+ private static final Logger LOG = LoggerFactory.getLogger(CXFEmbeddedServerService.class);
+
+ private final ServerFactoryBeanConfigurator serverFactoryBeanConfigurator;
+ private final JaxWsServiceConfigurator jaxwsServiceConfigurator;
+ private Server server;
+ private EndpointImpl endpoint;
+
+ private int simplePort;
+ private int jaxWsPort;
+
+ public CXFEmbeddedServerService(ServerFactoryBeanConfigurator serverFactoryBeanConfigurator, JaxWsServiceConfigurator jaxwsServiceConfigurator) {
+ this.serverFactoryBeanConfigurator = serverFactoryBeanConfigurator;
+ this.jaxwsServiceConfigurator = jaxwsServiceConfigurator;
+ }
+
+ @Override
+ public String getSimpleServerAddress() {
+ return String.format("http://%s:%d/%s/simpletest", NetworkUtils.getHostname(), simplePort, getClass().getSimpleName());
+ }
+
+ @Override
+ public String getJaxWsServerAddress() {
+ return String.format("http://%s:%d/%s/jaxwstest", NetworkUtils.getHostname(), jaxWsPort, getClass().getSimpleName());
+ }
+
+ public void configure() {
+ simplePort = NetworkUtils.getFreePort();
+ jaxWsPort = NetworkUtils.getFreePort();
+
+ ServerFactoryBean svrBean = new ServerFactoryBean();
+
+ // start a simple front service
+ svrBean.setAddress(getSimpleServerAddress());
+
+ serverFactoryBeanConfigurator.configure(svrBean);
+ svrBean.setBus(BusFactory.getDefaultBus());
+
+ server = svrBean.create();
+ server.getEndpoint().getInInterceptors().add(new LoggingInInterceptor());
+ server.getEndpoint().getOutInterceptors().add(new LoggingOutInterceptor());
+
+ // start a jaxws front service from the custom configurator
+ endpoint = jaxwsServiceConfigurator.configureEndpoint(getJaxWsServerAddress());
+
+ endpoint.getInInterceptors().add(new LoggingInInterceptor());
+ endpoint.getOutInterceptors().add(new LoggingOutInterceptor());
+
+ TestUtils.waitFor(() -> endpoint.isPublished());
+ }
+
+ @Override
+ public void registerProperties() {
+ System.setProperty(CXFProperties.SIMPLE_SERVER_ADDRESS, getSimpleServerAddress());
+ System.setProperty(CXFProperties.JAXWS_SERVER_ADDRESS, getJaxWsServerAddress());
+ }
+
+ @Override
+ public void initialize() {
+ LOG.info("Trying to start the CXF embedded server");
+ configure();
+
+ registerProperties();
+
+ LOG.info("CXF simple service running at {}", getSimpleServerAddress());
+ LOG.info("CXF JAX WS service running at {}", getJaxWsServerAddress());
+ }
+
+ @Override
+ public void shutdown() {
+ LOG.info("Stopping the CXF embedded server");
+
+ endpoint.stop();
+ server.stop();
+ server.destroy();
+ }
+}
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/services/CXFService.java
similarity index 52%
copy from tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
copy to tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/services/CXFService.java
index 9981604..b89f043 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/services/CXFService.java
@@ -14,17 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.camel.kafkaconnector.cxf.services;
-package org.apache.camel.kafkaconnector.cxf.sink;
+import org.apache.camel.test.infra.common.services.TestService;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
-import java.util.logging.Logger;
+/**
+ * Test infra service for CXF
+ */
+public interface CXFService extends BeforeEachCallback, AfterEachCallback, TestService {
+
+ String getSimpleServerAddress();
-public class GreeterImpl extends org.apache.hello_world_soap_http.GreeterImpl {
+ String getJaxWsServerAddress();
- private static final Logger LOG = Logger.getLogger(GreeterImpl.class.getName());
+ @Override
+ default void beforeEach(ExtensionContext extensionContext) throws Exception {
+ initialize();
+ }
- public String greetMe(String hi) {
- LOG.info("jaxws greetMe " + hi);
- return "Greet " + hi;
+ @Override
+ default void afterEach(ExtensionContext extensionContext) throws Exception {
+ shutdown();
}
}
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/services/JaxWsServiceConfigurator.java
similarity index 68%
copy from tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
copy to tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/services/JaxWsServiceConfigurator.java
index 9981604..47c5d09 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/services/JaxWsServiceConfigurator.java
@@ -14,17 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.camel.kafkaconnector.cxf.services;
-package org.apache.camel.kafkaconnector.cxf.sink;
+import org.apache.cxf.jaxws.EndpointImpl;
-import java.util.logging.Logger;
-
-public class GreeterImpl extends org.apache.hello_world_soap_http.GreeterImpl {
-
- private static final Logger LOG = Logger.getLogger(GreeterImpl.class.getName());
-
- public String greetMe(String hi) {
- LOG.info("jaxws greetMe " + hi);
- return "Greet " + hi;
- }
+public interface JaxWsServiceConfigurator {
+ EndpointImpl configureEndpoint(String address);
}
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/services/ServerFactoryBeanConfigurator.java
similarity index 68%
copy from tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
copy to tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/services/ServerFactoryBeanConfigurator.java
index 9981604..2dfa1c2 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/services/ServerFactoryBeanConfigurator.java
@@ -14,17 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.camel.kafkaconnector.cxf.services;
-package org.apache.camel.kafkaconnector.cxf.sink;
+import org.apache.cxf.frontend.ServerFactoryBean;
-import java.util.logging.Logger;
-
-public class GreeterImpl extends org.apache.hello_world_soap_http.GreeterImpl {
-
- private static final Logger LOG = Logger.getLogger(GreeterImpl.class.getName());
-
- public String greetMe(String hi) {
- LOG.info("jaxws greetMe " + hi);
- return "Greet " + hi;
- }
+public interface ServerFactoryBeanConfigurator {
+ void configure(ServerFactoryBean serverFactoryBean);
}
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java
index 830f804..68bb9e7 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java
@@ -17,101 +17,69 @@
package org.apache.camel.kafkaconnector.cxf.sink;
-import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
-import javax.xml.ws.Endpoint;
-
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
-import org.apache.camel.kafkaconnector.cxf.source.HelloService;
-import org.apache.cxf.BusFactory;
-import org.apache.cxf.endpoint.Server;
-import org.apache.cxf.ext.logging.LoggingInInterceptor;
-import org.apache.cxf.ext.logging.LoggingOutInterceptor;
-import org.apache.cxf.frontend.ServerFactoryBean;
-import org.apache.cxf.jaxws.EndpointImpl;
-import org.junit.jupiter.api.AfterEach;
+import org.apache.camel.kafkaconnector.cxf.services.CXFEmbeddedServerService;
+import org.apache.camel.kafkaconnector.cxf.services.CXFService;
+import org.apache.camel.kafkaconnector.cxf.services.JaxWsServiceConfigurator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class CamelSinkCXFITCase extends AbstractKafkaTest {
- protected static final String ECHO_OPERATION = "echo";
- protected static final String GREET_ME_OPERATION = "greetMe";
protected static final String TEST_MESSAGE = "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">"
+ "<soap:Body><ns1:echo xmlns:ns1=\"http://source.cxf.kafkaconnector.camel.apache.org/\">"
+ "<arg0 xmlns=\"http://source.cxf.kafkaconnector.camel.apache.org/\">hello world</arg0>"
+ "</ns1:echo></soap:Body></soap:Envelope>";
+
protected static final String JAXWS_TEST_MESSAGE = "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">\"\n"
+ " + \"<soap:Body><ns1:greetMe xmlns:ns1=\"http://apache.org/hello_world_soap_http/types\">\"\n"
+ " + \"<requestType xmlns=\"http://apache.org/hello_world_soap_http/types\">hello world!</requestType>\"\n"
+ " + \"</ns1:greetMe></soap:Body></soap:Envelope>";
private static final Logger LOG = LoggerFactory.getLogger(CamelSinkCXFITCase.class);
+
+ private SinkServerFactoryBeanConfigurator serverFactoryBeanConfigurator = new SinkServerFactoryBeanConfigurator();
+ private JaxWsServiceConfigurator jaxWsServiceConfigurator = new SinkJaxWsServiceConfigurator();
+
+ @RegisterExtension
+ public CXFService service = new CXFEmbeddedServerService(serverFactoryBeanConfigurator, jaxWsServiceConfigurator);
- protected Server server;
- protected EndpointImpl endpoint;
- private final int simplePort = NetworkUtils.getFreePort();
- private final int jaxwsPort = NetworkUtils.getFreePort();
private final int expect = 10;
+ private String topicName;
@Override
protected String[] getConnectorsInTest() {
- return new String[] {"camel-cxf-kafka-connector"};
- }
-
- protected String getSimpleServerAddress() {
- return "http://" + NetworkUtils.getHostname() + ":" + simplePort + "/" + getClass().getSimpleName() + "/simpletest";
- }
-
- protected String getJaxWsServerAddress() {
- return "http://" + NetworkUtils.getHostname() + ":" + jaxwsPort + "/" + getClass().getSimpleName() + "/jaxwstest";
+ return new String[]{"camel-cxf-kafka-connector"};
}
@BeforeEach
- public void setUp() throws IOException {
- // start a simple front service
- ServerFactoryBean svrBean = new ServerFactoryBean();
- svrBean.setAddress(getSimpleServerAddress());
- svrBean.setServiceClass(HelloService.class);
- svrBean.setServiceBean(new HelloServiceImpl());
- svrBean.setBus(BusFactory.getDefaultBus());
- server = svrBean.create();
- server.getEndpoint().getInInterceptors().add(new LoggingInInterceptor());
- server.getEndpoint().getOutInterceptors().add(new LoggingOutInterceptor());
- // start a jaxws front service
- GreeterImpl greeterImpl = new GreeterImpl();
- endpoint = (EndpointImpl) Endpoint.publish(getJaxWsServerAddress(), greeterImpl);
- endpoint.getInInterceptors().add(new LoggingInInterceptor());
- endpoint.getOutInterceptors().add(new LoggingOutInterceptor());
- }
-
- @AfterEach
- public void tearDown() {
- endpoint.stop();
- server.stop();
- server.destroy();
+ public void setUp() {
+ topicName = getTopicForTest(this);
+ GreeterImpl.outputFile().delete();
}
- private void putRecords(String message) {
+ private void putRecords(String message, int count) {
KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
- for (int i = 0; i < expect; i++) {
+ for (int i = 0; i < count; i++) {
try {
- kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), message);
+ kafkaClient.produce(topicName, message);
} catch (ExecutionException e) {
LOG.error("Unable to produce messages: {}", e.getMessage(), e);
} catch (InterruptedException e) {
@@ -120,29 +88,35 @@ public class CamelSinkCXFITCase extends AbstractKafkaTest {
}
}
- public void runTest(ConnectorPropertyFactory connectorPropertyFactory, String message)
+ public String getJaxwsEndpointUri() {
+ return service.getJaxWsServerAddress() + "?serviceClass=org.apache.hello_world_soap_http.Greeter";
+ }
+
+ public void runTest(ConnectorPropertyFactory connectorPropertyFactory, String message, int count)
throws ExecutionException, InterruptedException, TimeoutException {
connectorPropertyFactory.log();
- getKafkaConnectService().initializeConnector(connectorPropertyFactory);
getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
ExecutorService service = Executors.newCachedThreadPool();
- Runnable r = () -> this.putRecords(message);
+ Runnable r = () -> this.putRecords(message, count);
service.submit(r);
Thread.sleep(5000);
LOG.debug("Created the consumer ... About to receive messages");
-
}
@Test
- @Timeout(90)
public void testBasicSendReceiveUsingUrl() {
try {
+ ConnectorPropertyFactory connectorPropertyFactory = CamelSinkCXFPropertyFactory
+ .basic()
+ .withName("CamelCXFSinkConnector")
+ .withTopics(topicName)
+ .withAddress(service.getSimpleServerAddress())
+ .withServiceClass("org.apache.camel.kafkaconnector.cxf.source.HelloService")
+ .withDataFormat("RAW");
- ConnectorPropertyFactory connectorPropertyFactory = CamelSinkCXFPropertyFactory.basic()
- .withTopics(TestUtils.getDefaultTestTopic(this.getClass())).withAddress(getSimpleServerAddress())
- .withServiceClass("org.apache.camel.kafkaconnector.cxf.source.HelloService").withDataFormat("RAW");
+ runTest(connectorPropertyFactory, TEST_MESSAGE, expect);
- runTest(connectorPropertyFactory, TEST_MESSAGE);
+ assertEquals(expect, serverFactoryBeanConfigurator.getInvocationCount());
} catch (Exception e) {
LOG.error("CXF Sink test failed: {} {}", e.getMessage(), e);
fail(e.getMessage(), e);
@@ -153,24 +127,19 @@ public class CamelSinkCXFITCase extends AbstractKafkaTest {
@Timeout(90)
public void testJaxWsBasicSendReceiveUsingUrl() {
try {
-
- ConnectorPropertyFactory connectorPropertyFactory = CamelSinkCXFPropertyFactory.basic()
- .withTopics(TestUtils.getDefaultTestTopic(this.getClass())).withAddress(this.getJaxwsEndpointUri())
+ ConnectorPropertyFactory connectorPropertyFactory = CamelSinkCXFPropertyFactory
+ .basic()
+ .withName("CamelCXFSinkConnectorUrl")
+ .withTopics(topicName)
+ .withAddress(getJaxwsEndpointUri())
.withDataFormat("RAW");
- runTest(connectorPropertyFactory, JAXWS_TEST_MESSAGE);
+ runTest(connectorPropertyFactory, JAXWS_TEST_MESSAGE, 1);
+
+ assertTrue(GreeterImpl.outputFile().exists(), "The test output file was not created");
} catch (Exception e) {
LOG.error("CXF Sink test failed: {} {}", e.getMessage(), e);
fail(e.getMessage(), e);
}
}
-
- protected String getSimpleEndpointUri() {
- return getSimpleServerAddress() + "?serviceClass=org.apache.camel.kafkaconnector.cxf.source.HelloService";
- }
-
- protected String getJaxwsEndpointUri() {
- return getJaxWsServerAddress() + "?serviceClass=org.apache.hello_world_soap_http.Greeter";
- }
-
}
\ No newline at end of file
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFPropertyFactory.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFPropertyFactory.java
index 3e814df..f346686 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFPropertyFactory.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFPropertyFactory.java
@@ -44,7 +44,8 @@ final class CamelSinkCXFPropertyFactory extends SinkConnectorPropertyFactory<Cam
}
public static CamelSinkCXFPropertyFactory basic() {
- return new CamelSinkCXFPropertyFactory().withTasksMax(1).withName("CamelCXFSinkConnector")
+ return new CamelSinkCXFPropertyFactory()
+ .withTasksMax(1)
.withConnectorClass("org.apache.camel.kafkaconnector.cxf.CamelCxfSinkConnector")
.withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
.withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
index 9981604..bccef12 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
@@ -17,14 +17,30 @@
package org.apache.camel.kafkaconnector.cxf.sink;
+import java.io.File;
+import java.io.IOException;
import java.util.logging.Logger;
public class GreeterImpl extends org.apache.hello_world_soap_http.GreeterImpl {
-
private static final Logger LOG = Logger.getLogger(GreeterImpl.class.getName());
public String greetMe(String hi) {
- LOG.info("jaxws greetMe " + hi);
+ File outputFile = outputFile();
+
+ try {
+ outputFile.createNewFile();
+ LOG.info("jaxws greetMe " + hi);
+
+ } catch (IOException e) {
+ LOG.warning("Failed to create result test file");
+ }
+
return "Greet " + hi;
}
+
+ public static File outputFile() {
+ String path = GreeterImpl.class.getResource(".").getFile();
+
+ return new File(path, "cxf.test.result");
+ }
}
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/SinkJaxWsServiceConfigurator.java
similarity index 67%
copy from tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
copy to tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/SinkJaxWsServiceConfigurator.java
index 9981604..fd7b56f 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/SinkJaxWsServiceConfigurator.java
@@ -14,17 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.camel.kafkaconnector.cxf.sink;
-import java.util.logging.Logger;
-
-public class GreeterImpl extends org.apache.hello_world_soap_http.GreeterImpl {
+import javax.xml.ws.Endpoint;
- private static final Logger LOG = Logger.getLogger(GreeterImpl.class.getName());
+import org.apache.camel.kafkaconnector.cxf.services.JaxWsServiceConfigurator;
+import org.apache.cxf.jaxws.EndpointImpl;
- public String greetMe(String hi) {
- LOG.info("jaxws greetMe " + hi);
- return "Greet " + hi;
+class SinkJaxWsServiceConfigurator implements JaxWsServiceConfigurator {
+ @Override
+ public EndpointImpl configureEndpoint(String address) {
+ GreeterImpl greeterImpl = new GreeterImpl();
+ return (EndpointImpl) Endpoint.publish(address, greeterImpl);
}
}
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/SinkServerFactoryBeanConfigurator.java
similarity index 57%
copy from tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
copy to tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/SinkServerFactoryBeanConfigurator.java
index 9981604..28860da 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/SinkServerFactoryBeanConfigurator.java
@@ -14,17 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.camel.kafkaconnector.cxf.sink;
-import java.util.logging.Logger;
+import org.apache.camel.kafkaconnector.cxf.services.ServerFactoryBeanConfigurator;
+import org.apache.camel.kafkaconnector.cxf.source.HelloService;
+import org.apache.cxf.frontend.ServerFactoryBean;
-public class GreeterImpl extends org.apache.hello_world_soap_http.GreeterImpl {
+class SinkServerFactoryBeanConfigurator implements ServerFactoryBeanConfigurator {
+ private HelloService helloService = new HelloServiceImpl();
- private static final Logger LOG = Logger.getLogger(GreeterImpl.class.getName());
+ @Override
+ public void configure(ServerFactoryBean serverFactoryBean) {
+ serverFactoryBean.setServiceClass(HelloService.class);
+ serverFactoryBean.setServiceBean(helloService);
+ }
- public String greetMe(String hi) {
- LOG.info("jaxws greetMe " + hi);
- return "Greet " + hi;
+ public int getInvocationCount() {
+ return helloService.getInvocationCount();
}
}