You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/03/29 05:52:42 UTC
[camel-kafka-connector] 02/08: Adjusted the code to CAMEL-16299
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch 3.9.0-rework
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 6aee8330f1b2764134f9ee27a9954e4067a1beb7
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Mar 10 15:18:24 2021 +0100
Adjusted the code to CAMEL-16299
---
.../blob/sink/CamelSinkAzureStorageBlobITCase.java | 2 +-
.../sink/CamelSinkAzureStorageQueueITCase.java | 2 +-
.../source/CamelSourceAzureStorageQueueITCase.java | 2 +-
.../kafkaconnector/common/AbstractKafkaTest.java | 15 ++-
.../common/services/kafka/KafkaServiceFactory.java | 45 ---------
.../common/services/kafka/StrimziService.java | 103 ---------------------
.../kafkaconnector/sjms2/common/SJMS2Common.java | 9 ++
.../sjms2/sink/CamelSinkIdempotentJMSITCase.java | 9 +-
.../sjms2/sink/CamelSinkJMSITCase.java | 9 +-
.../sjms2/source/CamelSourceJMSITCase.java | 9 +-
.../source/CamelSourceJMSWithAggregation.java | 9 +-
11 files changed, 41 insertions(+), 173 deletions(-)
diff --git a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
index 727d3fb..e7cb8e5 100644
--- a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
+++ b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
@@ -51,7 +51,7 @@ import static org.junit.jupiter.api.Assertions.fail;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CamelSinkAzureStorageBlobITCase extends CamelSinkTestSupport {
@RegisterExtension
- public static AzureService service = AzureStorageBlobServiceFactory.createAzureService();
+ public static AzureService service = AzureStorageBlobServiceFactory.createService();
private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAzureStorageBlobITCase.class);
private BlobServiceClient client;
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
index ef12c18..78a297c 100644
--- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
@@ -46,7 +46,7 @@ import static org.junit.jupiter.api.Assertions.fail;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CamelSinkAzureStorageQueueITCase extends CamelSinkTestSupport {
@RegisterExtension
- public static AzureService service = AzureStorageQueueServiceFactory.createAzureService();
+ public static AzureService service = AzureStorageQueueServiceFactory.createService();
private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAzureStorageQueueITCase.class);
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java
index 26faa6e..da640da 100644
--- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java
@@ -45,7 +45,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CamelSourceAzureStorageQueueITCase extends CamelSourceTestSupport {
@RegisterExtension
- public static AzureService service = AzureStorageQueueServiceFactory.createAzureService();
+ public static AzureService service = AzureStorageQueueServiceFactory.createService();
private QueueServiceClient client;
private QueueClient queueClient;
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java
index fb24ca9..ee332a8 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java
@@ -17,12 +17,16 @@
package org.apache.camel.kafkaconnector.common;
-import org.apache.camel.kafkaconnector.common.services.kafka.KafkaServiceFactory;
+import org.apache.camel.kafkaconnector.common.services.kafka.EmbeddedKafkaService;
import org.apache.camel.kafkaconnector.common.services.kafkaconnect.KafkaConnectRunnerFactory;
import org.apache.camel.kafkaconnector.common.services.kafkaconnect.KafkaConnectService;
import org.apache.camel.kafkaconnector.common.utils.PropertyUtils;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.test.infra.kafka.services.ContainerLocalKafkaService;
import org.apache.camel.test.infra.kafka.services.KafkaService;
+import org.apache.camel.test.infra.kafka.services.KafkaServiceFactory;
+import org.apache.camel.test.infra.kafka.services.RemoteKafkaService;
+import org.apache.camel.test.infra.kafka.services.StrimziService;
import org.junit.jupiter.api.extension.RegisterExtension;
public abstract class AbstractKafkaTest {
@@ -39,7 +43,14 @@ public abstract class AbstractKafkaTest {
public AbstractKafkaTest() {
PluginPathHelper.getInstance().registerConnector(getConnectorsInTest());
- kafkaService = KafkaServiceFactory.createService();
+ kafkaService = KafkaServiceFactory
+ .builder()
+ .addLocalMapping(EmbeddedKafkaService::new)
+ .addRemoteMapping(RemoteKafkaService::new)
+ .addMapping("embedded", EmbeddedKafkaService::new)
+ .addMapping("local-strimzi-container", StrimziService::new)
+ .addMapping("local-cp-kafka-container", ContainerLocalKafkaService::new)
+ .build();
kafkaService.initialize();
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/KafkaServiceFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/KafkaServiceFactory.java
deleted file mode 100644
index 052ba9c..0000000
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/KafkaServiceFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.common.services.kafka;
-
-import org.apache.camel.test.infra.kafka.services.KafkaService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public final class KafkaServiceFactory {
- private static final Logger LOG = LoggerFactory.getLogger(KafkaServiceFactory.class);
-
- private KafkaServiceFactory() {
-
- }
-
- public static KafkaService createService() {
- String kafkaInstanceType = System.getProperty("kafka.instance.type");
-
- if (kafkaInstanceType == null || kafkaInstanceType.equals("embedded")) {
- return new EmbeddedKafkaService();
- }
-
- if (kafkaInstanceType.equals("local-strimzi-container")) {
- return new StrimziService();
- }
-
- return org.apache.camel.test.infra.kafka.services.KafkaServiceFactory.createService();
-
- }
-}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziService.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziService.java
deleted file mode 100644
index 6f3b03e..0000000
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziService.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.common.services.kafka;
-
-import org.apache.camel.test.infra.common.TestUtils;
-import org.apache.camel.test.infra.common.services.ContainerService;
-import org.apache.camel.test.infra.kafka.common.KafkaProperties;
-import org.apache.camel.test.infra.kafka.services.KafkaService;
-import org.apache.camel.test.infra.kafka.services.StrimziContainer;
-import org.apache.camel.test.infra.kafka.services.ZookeeperContainer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.Network;
-
-class StrimziService implements KafkaService, ContainerService<StrimziContainer> {
- private static final Logger LOG = LoggerFactory.getLogger(StrimziService.class);
-
- private final ZookeeperContainer zookeeperContainer;
- private final StrimziContainer strimziContainer;
-
- public StrimziService() {
- Network network = Network.newNetwork();
-
- String zookeeperInstanceName = "zookeeper-" + TestUtils.randomWithRange(1, 100);
- zookeeperContainer = new ZookeeperContainer(network, zookeeperInstanceName);
-
- String strimziInstanceName = "strimzi-" + TestUtils.randomWithRange(1, 100);
- strimziContainer = new StrimziContainer(network, strimziInstanceName, zookeeperInstanceName);
- }
-
- public StrimziService(ZookeeperContainer zookeeperContainer, StrimziContainer strimziContainer) {
- this.zookeeperContainer = zookeeperContainer;
- this.strimziContainer = strimziContainer;
- }
-
- protected Integer getKafkaPort() {
- return strimziContainer.getKafkaPort();
- }
-
- @Override
- public String getBootstrapServers() {
- return strimziContainer.getContainerIpAddress() + ":" + getKafkaPort();
- }
-
- @Override
- public void registerProperties() {
- System.setProperty(KafkaProperties.KAFKA_BOOTSTRAP_SERVERS, getBootstrapServers());
- }
-
- @Override
- public void initialize() {
- zookeeperContainer.start();
-
- String zookeeperConnect = zookeeperContainer.getContainerIpAddress() + ":" + zookeeperContainer.getZookeeperPort();
- LOG.info("Apache Zookeeper running at address {}", zookeeperConnect);
-
- strimziContainer.start();
-
- registerProperties();
- LOG.info("Kafka bootstrap server running at address {}", getBootstrapServers());
- }
-
- private boolean stopped() {
- return !strimziContainer.isRunning() && !zookeeperContainer.isRunning();
- }
-
- @Override
- public void shutdown() {
- try {
- LOG.info("Stopping Kafka container");
- strimziContainer.stop();
- } finally {
- LOG.info("Stopping Zookeeper container");
- zookeeperContainer.stop();
-
- TestUtils.waitFor(this::stopped);
- }
- }
-
- @Override
- public StrimziContainer getContainer() {
- return strimziContainer;
- }
-
- protected ZookeeperContainer getZookeeperContainer() {
- return zookeeperContainer;
- }
-}
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/common/SJMS2Common.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/common/SJMS2Common.java
index 24e9f9a..44e6a94 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/common/SJMS2Common.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/common/SJMS2Common.java
@@ -17,6 +17,9 @@
package org.apache.camel.kafkaconnector.sjms2.common;
+import org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer;
+import org.apache.camel.test.infra.messaging.services.MessagingLocalContainerService;
+
public final class SJMS2Common {
/**
* The default JMS queue name used during the tests
@@ -26,4 +29,10 @@ public final class SJMS2Common {
private SJMS2Common() {
}
+
+ public static MessagingLocalContainerService<DispatchRouterContainer> createLocalService() {
+ DispatchRouterContainer container = new DispatchRouterContainer();
+
+ return new MessagingLocalContainerService<>(container, c -> container.defaultEndpoint());
+ }
}
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
index 0b8bb52..d0273dc 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
@@ -35,9 +35,8 @@ import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient;
import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
-import org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer;
import org.apache.camel.test.infra.messaging.services.MessagingService;
-import org.apache.camel.test.infra.messaging.services.MessagingServiceBuilder;
+import org.apache.camel.test.infra.messaging.services.MessagingServiceFactory;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
@@ -55,9 +54,9 @@ import static org.junit.jupiter.api.Assertions.fail;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CamelSinkIdempotentJMSITCase extends CamelSinkTestSupport {
@RegisterExtension
- public static MessagingService jmsService = MessagingServiceBuilder
- .newBuilder(DispatchRouterContainer::new)
- .withEndpointProvider(DispatchRouterContainer::defaultEndpoint)
+ public static MessagingService jmsService = MessagingServiceFactory
+ .builder()
+ .addLocalMapping(SJMS2Common::createLocalService)
.build();
private static final Logger LOG = LoggerFactory.getLogger(CamelSinkIdempotentJMSITCase.class);
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java
index 50dabe6..b56c580 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java
@@ -30,9 +30,8 @@ import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient;
import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
-import org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer;
import org.apache.camel.test.infra.messaging.services.MessagingService;
-import org.apache.camel.test.infra.messaging.services.MessagingServiceBuilder;
+import org.apache.camel.test.infra.messaging.services.MessagingServiceFactory;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
@@ -50,9 +49,9 @@ import static org.junit.jupiter.api.Assertions.fail;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CamelSinkJMSITCase extends CamelSinkTestSupport {
@RegisterExtension
- public static MessagingService jmsService = MessagingServiceBuilder
- .newBuilder(DispatchRouterContainer::new)
- .withEndpointProvider(DispatchRouterContainer::defaultEndpoint)
+ public static MessagingService jmsService = MessagingServiceFactory
+ .builder()
+ .addLocalMapping(SJMS2Common::createLocalService)
.build();
private static final Logger LOG = LoggerFactory.getLogger(CamelSinkJMSITCase.class);
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java
index 781e029..ffbee12 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java
@@ -27,9 +27,8 @@ import org.apache.camel.kafkaconnector.common.test.IntegerMessageConsumer;
import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient;
import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
-import org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer;
import org.apache.camel.test.infra.messaging.services.MessagingService;
-import org.apache.camel.test.infra.messaging.services.MessagingServiceBuilder;
+import org.apache.camel.test.infra.messaging.services.MessagingServiceFactory;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -47,9 +46,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CamelSourceJMSITCase extends CamelSourceTestSupport {
@RegisterExtension
- public static MessagingService jmsService = MessagingServiceBuilder
- .newBuilder(DispatchRouterContainer::new)
- .withEndpointProvider(DispatchRouterContainer::defaultEndpoint)
+ public static MessagingService jmsService = MessagingServiceFactory
+ .builder()
+ .addLocalMapping(SJMS2Common::createLocalService)
.build();
private String topicName;
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java
index 6b95304..ce25c7b 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java
@@ -28,9 +28,8 @@ import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient;
import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
-import org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer;
import org.apache.camel.test.infra.messaging.services.MessagingService;
-import org.apache.camel.test.infra.messaging.services.MessagingServiceBuilder;
+import org.apache.camel.test.infra.messaging.services.MessagingServiceFactory;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -44,9 +43,9 @@ import static org.junit.jupiter.api.Assertions.fail;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CamelSourceJMSWithAggregation extends CamelSourceTestSupport {
@RegisterExtension
- public static MessagingService jmsService = MessagingServiceBuilder
- .newBuilder(DispatchRouterContainer::new)
- .withEndpointProvider(DispatchRouterContainer::defaultEndpoint)
+ public static MessagingService jmsService = MessagingServiceFactory
+ .builder()
+ .addLocalMapping(SJMS2Common::createLocalService)
.build();
private final int sentSize = 10;