You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2020/12/17 07:37:35 UTC
[camel-kafka-connector] 02/02: Properties cleanups
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 29ccb8226f0a5c2f540a8c7fbb4911a88d43e6c9
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Nov 16 09:23:32 2020 +0100
Properties cleanups
---
.../kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java | 15 ++++++++++++---
.../sjms2/source/CamelSourceJMSITCase.java | 17 +++++++++++++----
.../sjms2/source/CamelSourceJMSWithAggregation.java | 12 ++++++++++--
3 files changed, 35 insertions(+), 9 deletions(-)
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java
index 1b1054f..b97e40f 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java
@@ -17,6 +17,7 @@
package org.apache.camel.kafkaconnector.sjms2.sink;
+import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -56,7 +57,6 @@ public class CamelSinkJMSITCase extends AbstractKafkaTest {
public static MessagingService jmsService = MessagingServiceBuilder
.newBuilder(DispatchRouterContainer::new)
.withEndpointProvider(DispatchRouterContainer::defaultEndpoint)
- .withPropertiesProvider(DispatchRouterContainer::connectionProperties)
.build();
private static final Logger LOG = LoggerFactory.getLogger(CamelSinkJMSITCase.class);
@@ -64,6 +64,15 @@ public class CamelSinkJMSITCase extends AbstractKafkaTest {
private int received;
private final int expect = 10;
+ private Properties connectionProperties() {
+ Properties properties = new Properties();
+
+ properties.put("camel.component.sjms2.connection-factory", "#class:org.apache.qpid.jms.JmsConnectionFactory");
+ properties.put("camel.component.sjms2.connection-factory.remoteURI", jmsService.defaultEndpoint());
+
+ return properties;
+ }
+
@Override
protected String[] getConnectorsInTest() {
return new String[] {"camel-sjms2-kafka-connector"};
@@ -129,7 +138,7 @@ public class CamelSinkJMSITCase extends AbstractKafkaTest {
ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
.basic()
.withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
- .withConnectionProperties(jmsService.connectionProperties())
+ .withConnectionProperties(connectionProperties())
.withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE);
runTest(connectorPropertyFactory);
@@ -147,7 +156,7 @@ public class CamelSinkJMSITCase extends AbstractKafkaTest {
ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
.basic()
.withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
- .withConnectionProperties(jmsService.connectionProperties())
+ .withConnectionProperties(connectionProperties())
.withUrl(SJMS2Common.DEFAULT_JMS_QUEUE)
.buildUrl();
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java
index 7f0f5c4..5b23e2a 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java
@@ -17,6 +17,7 @@
package org.apache.camel.kafkaconnector.sjms2.source;
+import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
@@ -50,7 +51,6 @@ public class CamelSourceJMSITCase extends AbstractKafkaTest {
@RegisterExtension
public static MessagingService jmsService = MessagingServiceBuilder
.newBuilder(DispatchRouterContainer::new)
- .withPropertiesProvider(DispatchRouterContainer::connectionProperties)
.withEndpointProvider(DispatchRouterContainer::defaultEndpoint)
.build();
@@ -60,6 +60,15 @@ public class CamelSourceJMSITCase extends AbstractKafkaTest {
private final int expect = 10;
private JMSClient jmsClient;
+ private Properties connectionProperties() {
+ Properties properties = new Properties();
+
+ properties.put("camel.component.sjms2.connection-factory", "#class:org.apache.qpid.jms.JmsConnectionFactory");
+ properties.put("camel.component.sjms2.connection-factory.remoteURI", jmsService.defaultEndpoint());
+
+ return properties;
+ }
+
@Override
protected String[] getConnectorsInTest() {
return new String[] {"camel-sjms2-kafka-connector"};
@@ -106,7 +115,7 @@ public class CamelSourceJMSITCase extends AbstractKafkaTest {
.basic()
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
.withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE)
- .withConnectionProperties(jmsService.connectionProperties());
+ .withConnectionProperties(connectionProperties());
runBasicStringTest(connectorPropertyFactory);
} catch (Exception e) {
@@ -121,7 +130,7 @@ public class CamelSourceJMSITCase extends AbstractKafkaTest {
try {
ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
.basic()
- .withConnectionProperties(jmsService.connectionProperties())
+ .withConnectionProperties(connectionProperties())
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
.withUrl(SJMS2Common.DEFAULT_JMS_QUEUE)
.buildUrl();
@@ -144,7 +153,7 @@ public class CamelSourceJMSITCase extends AbstractKafkaTest {
.basic()
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()) + jmsQueueName)
.withDestinationName(jmsQueueName)
- .withConnectionProperties(jmsService.connectionProperties());
+ .withConnectionProperties(connectionProperties());
connectorPropertyFactory.log();
getKafkaConnectService().initializeConnector(connectorPropertyFactory);
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java
index 0a66154..4c96ab4 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java
@@ -17,6 +17,7 @@
package org.apache.camel.kafkaconnector.sjms2.source;
+import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
@@ -43,7 +44,6 @@ public class CamelSourceJMSWithAggregation extends AbstractKafkaTest {
@RegisterExtension
public static MessagingService jmsService = MessagingServiceBuilder
.newBuilder(DispatchRouterContainer::new)
- .withPropertiesProvider(DispatchRouterContainer::connectionProperties)
.withEndpointProvider(DispatchRouterContainer::defaultEndpoint)
.build();
@@ -57,6 +57,14 @@ public class CamelSourceJMSWithAggregation extends AbstractKafkaTest {
private String expectedMessage = "";
private String queueName;
+ private Properties connectionProperties() {
+ Properties properties = new Properties();
+
+ properties.put("camel.component.sjms2.connection-factory", "#class:org.apache.qpid.jms.JmsConnectionFactory");
+ properties.put("camel.component.sjms2.connection-factory.remoteURI", jmsService.defaultEndpoint());
+
+ return properties;
+ }
@Override
protected String[] getConnectorsInTest() {
@@ -112,7 +120,7 @@ public class CamelSourceJMSWithAggregation extends AbstractKafkaTest {
.basic()
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
.withDestinationName(queueName)
- .withConnectionProperties(jmsService.connectionProperties())
+ .withConnectionProperties(connectionProperties())
.withAggregate("org.apache.camel.kafkaconnector.aggregator.StringAggregator", sentSize,
1000);