You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/03/29 05:52:42 UTC

[camel-kafka-connector] 02/08: Adjusted the code to CAMEL-16299

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch 3.9.0-rework
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 6aee8330f1b2764134f9ee27a9954e4067a1beb7
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Mar 10 15:18:24 2021 +0100

    Adjusted the code to CAMEL-16299
---
 .../blob/sink/CamelSinkAzureStorageBlobITCase.java |   2 +-
 .../sink/CamelSinkAzureStorageQueueITCase.java     |   2 +-
 .../source/CamelSourceAzureStorageQueueITCase.java |   2 +-
 .../kafkaconnector/common/AbstractKafkaTest.java   |  15 ++-
 .../common/services/kafka/KafkaServiceFactory.java |  45 ---------
 .../common/services/kafka/StrimziService.java      | 103 ---------------------
 .../kafkaconnector/sjms2/common/SJMS2Common.java   |   9 ++
 .../sjms2/sink/CamelSinkIdempotentJMSITCase.java   |   9 +-
 .../sjms2/sink/CamelSinkJMSITCase.java             |   9 +-
 .../sjms2/source/CamelSourceJMSITCase.java         |   9 +-
 .../source/CamelSourceJMSWithAggregation.java      |   9 +-
 11 files changed, 41 insertions(+), 173 deletions(-)

diff --git a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
index 727d3fb..e7cb8e5 100644
--- a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
+++ b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
@@ -51,7 +51,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CamelSinkAzureStorageBlobITCase extends CamelSinkTestSupport {
     @RegisterExtension
-    public static AzureService service = AzureStorageBlobServiceFactory.createAzureService();
+    public static AzureService service = AzureStorageBlobServiceFactory.createService();
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAzureStorageBlobITCase.class);
 
     private BlobServiceClient client;
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
index ef12c18..78a297c 100644
--- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
@@ -46,7 +46,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CamelSinkAzureStorageQueueITCase extends CamelSinkTestSupport {
     @RegisterExtension
-    public static AzureService service = AzureStorageQueueServiceFactory.createAzureService();
+    public static AzureService service = AzureStorageQueueServiceFactory.createService();
 
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAzureStorageQueueITCase.class);
 
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java
index 26faa6e..da640da 100644
--- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java
@@ -45,7 +45,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CamelSourceAzureStorageQueueITCase extends CamelSourceTestSupport {
     @RegisterExtension
-    public static AzureService service = AzureStorageQueueServiceFactory.createAzureService();
+    public static AzureService service = AzureStorageQueueServiceFactory.createService();
 
     private QueueServiceClient client;
     private QueueClient queueClient;
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java
index fb24ca9..ee332a8 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java
@@ -17,12 +17,16 @@
 
 package org.apache.camel.kafkaconnector.common;
 
-import org.apache.camel.kafkaconnector.common.services.kafka.KafkaServiceFactory;
+import org.apache.camel.kafkaconnector.common.services.kafka.EmbeddedKafkaService;
 import org.apache.camel.kafkaconnector.common.services.kafkaconnect.KafkaConnectRunnerFactory;
 import org.apache.camel.kafkaconnector.common.services.kafkaconnect.KafkaConnectService;
 import org.apache.camel.kafkaconnector.common.utils.PropertyUtils;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.test.infra.kafka.services.ContainerLocalKafkaService;
 import org.apache.camel.test.infra.kafka.services.KafkaService;
+import org.apache.camel.test.infra.kafka.services.KafkaServiceFactory;
+import org.apache.camel.test.infra.kafka.services.RemoteKafkaService;
+import org.apache.camel.test.infra.kafka.services.StrimziService;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 public abstract class AbstractKafkaTest {
@@ -39,7 +43,14 @@ public abstract class AbstractKafkaTest {
     public AbstractKafkaTest() {
         PluginPathHelper.getInstance().registerConnector(getConnectorsInTest());
 
-        kafkaService = KafkaServiceFactory.createService();
+        kafkaService = KafkaServiceFactory
+                .builder()
+                .addLocalMapping(EmbeddedKafkaService::new)
+                .addRemoteMapping(RemoteKafkaService::new)
+                .addMapping("embedded", EmbeddedKafkaService::new)
+                .addMapping("local-strimzi-container", StrimziService::new)
+                .addMapping("local-cp-kafka-container", ContainerLocalKafkaService::new)
+                .build();
 
         kafkaService.initialize();
 
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/KafkaServiceFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/KafkaServiceFactory.java
deleted file mode 100644
index 052ba9c..0000000
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/KafkaServiceFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.common.services.kafka;
-
-import org.apache.camel.test.infra.kafka.services.KafkaService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public final class KafkaServiceFactory {
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaServiceFactory.class);
-
-    private KafkaServiceFactory() {
-
-    }
-
-    public static KafkaService createService() {
-        String kafkaInstanceType = System.getProperty("kafka.instance.type");
-
-        if (kafkaInstanceType == null || kafkaInstanceType.equals("embedded")) {
-            return new EmbeddedKafkaService();
-        }
-
-        if (kafkaInstanceType.equals("local-strimzi-container")) {
-            return new StrimziService();
-        }
-
-        return org.apache.camel.test.infra.kafka.services.KafkaServiceFactory.createService();
-
-    }
-}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziService.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziService.java
deleted file mode 100644
index 6f3b03e..0000000
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziService.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.common.services.kafka;
-
-import org.apache.camel.test.infra.common.TestUtils;
-import org.apache.camel.test.infra.common.services.ContainerService;
-import org.apache.camel.test.infra.kafka.common.KafkaProperties;
-import org.apache.camel.test.infra.kafka.services.KafkaService;
-import org.apache.camel.test.infra.kafka.services.StrimziContainer;
-import org.apache.camel.test.infra.kafka.services.ZookeeperContainer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.Network;
-
-class StrimziService implements KafkaService, ContainerService<StrimziContainer> {
-    private static final Logger LOG = LoggerFactory.getLogger(StrimziService.class);
-
-    private final ZookeeperContainer zookeeperContainer;
-    private final StrimziContainer strimziContainer;
-
-    public StrimziService() {
-        Network network = Network.newNetwork();
-
-        String zookeeperInstanceName = "zookeeper-" + TestUtils.randomWithRange(1, 100);
-        zookeeperContainer = new ZookeeperContainer(network, zookeeperInstanceName);
-
-        String strimziInstanceName = "strimzi-" + TestUtils.randomWithRange(1, 100);
-        strimziContainer = new StrimziContainer(network, strimziInstanceName, zookeeperInstanceName);
-    }
-
-    public StrimziService(ZookeeperContainer zookeeperContainer, StrimziContainer strimziContainer) {
-        this.zookeeperContainer = zookeeperContainer;
-        this.strimziContainer = strimziContainer;
-    }
-
-    protected Integer getKafkaPort() {
-        return strimziContainer.getKafkaPort();
-    }
-
-    @Override
-    public String getBootstrapServers() {
-        return strimziContainer.getContainerIpAddress() + ":" + getKafkaPort();
-    }
-
-    @Override
-    public void registerProperties() {
-        System.setProperty(KafkaProperties.KAFKA_BOOTSTRAP_SERVERS, getBootstrapServers());
-    }
-
-    @Override
-    public void initialize() {
-        zookeeperContainer.start();
-
-        String zookeeperConnect = zookeeperContainer.getContainerIpAddress() + ":" + zookeeperContainer.getZookeeperPort();
-        LOG.info("Apache Zookeeper running at address {}", zookeeperConnect);
-
-        strimziContainer.start();
-
-        registerProperties();
-        LOG.info("Kafka bootstrap server running at address {}", getBootstrapServers());
-    }
-
-    private boolean stopped() {
-        return !strimziContainer.isRunning() && !zookeeperContainer.isRunning();
-    }
-
-    @Override
-    public void shutdown() {
-        try {
-            LOG.info("Stopping Kafka container");
-            strimziContainer.stop();
-        } finally {
-            LOG.info("Stopping Zookeeper container");
-            zookeeperContainer.stop();
-
-            TestUtils.waitFor(this::stopped);
-        }
-    }
-
-    @Override
-    public StrimziContainer getContainer() {
-        return strimziContainer;
-    }
-
-    protected ZookeeperContainer getZookeeperContainer() {
-        return zookeeperContainer;
-    }
-}
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/common/SJMS2Common.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/common/SJMS2Common.java
index 24e9f9a..44e6a94 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/common/SJMS2Common.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/common/SJMS2Common.java
@@ -17,6 +17,9 @@
 
 package org.apache.camel.kafkaconnector.sjms2.common;
 
+import org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer;
+import org.apache.camel.test.infra.messaging.services.MessagingLocalContainerService;
+
 public final class SJMS2Common {
     /**
      * The default JMS queue name used during the tests
@@ -26,4 +29,10 @@ public final class SJMS2Common {
     private SJMS2Common() {
 
     }
+
+    public static MessagingLocalContainerService<DispatchRouterContainer> createLocalService() {
+        DispatchRouterContainer container = new DispatchRouterContainer();
+
+        return new MessagingLocalContainerService<>(container, c -> container.defaultEndpoint());
+    }
 }
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
index 0b8bb52..d0273dc 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
@@ -35,9 +35,8 @@ import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient;
 import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
-import org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer;
 import org.apache.camel.test.infra.messaging.services.MessagingService;
-import org.apache.camel.test.infra.messaging.services.MessagingServiceBuilder;
+import org.apache.camel.test.infra.messaging.services.MessagingServiceFactory;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
@@ -55,9 +54,9 @@ import static org.junit.jupiter.api.Assertions.fail;
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CamelSinkIdempotentJMSITCase extends CamelSinkTestSupport {
     @RegisterExtension
-    public static MessagingService jmsService = MessagingServiceBuilder
-            .newBuilder(DispatchRouterContainer::new)
-            .withEndpointProvider(DispatchRouterContainer::defaultEndpoint)
+    public static MessagingService jmsService = MessagingServiceFactory
+            .builder()
+            .addLocalMapping(SJMS2Common::createLocalService)
             .build();
 
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkIdempotentJMSITCase.class);
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java
index 50dabe6..b56c580 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java
@@ -30,9 +30,8 @@ import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient;
 import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
-import org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer;
 import org.apache.camel.test.infra.messaging.services.MessagingService;
-import org.apache.camel.test.infra.messaging.services.MessagingServiceBuilder;
+import org.apache.camel.test.infra.messaging.services.MessagingServiceFactory;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
@@ -50,9 +49,9 @@ import static org.junit.jupiter.api.Assertions.fail;
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CamelSinkJMSITCase extends CamelSinkTestSupport {
     @RegisterExtension
-    public static MessagingService jmsService = MessagingServiceBuilder
-            .newBuilder(DispatchRouterContainer::new)
-            .withEndpointProvider(DispatchRouterContainer::defaultEndpoint)
+    public static MessagingService jmsService = MessagingServiceFactory
+            .builder()
+            .addLocalMapping(SJMS2Common::createLocalService)
             .build();
 
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkJMSITCase.class);
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java
index 781e029..ffbee12 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java
@@ -27,9 +27,8 @@ import org.apache.camel.kafkaconnector.common.test.IntegerMessageConsumer;
 import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient;
 import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
-import org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer;
 import org.apache.camel.test.infra.messaging.services.MessagingService;
-import org.apache.camel.test.infra.messaging.services.MessagingServiceBuilder;
+import org.apache.camel.test.infra.messaging.services.MessagingServiceFactory;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -47,9 +46,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CamelSourceJMSITCase extends CamelSourceTestSupport {
     @RegisterExtension
-    public static MessagingService jmsService = MessagingServiceBuilder
-            .newBuilder(DispatchRouterContainer::new)
-            .withEndpointProvider(DispatchRouterContainer::defaultEndpoint)
+    public static MessagingService jmsService = MessagingServiceFactory
+            .builder()
+            .addLocalMapping(SJMS2Common::createLocalService)
             .build();
 
     private String topicName;
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java
index 6b95304..ce25c7b 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java
@@ -28,9 +28,8 @@ import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient;
 import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
-import org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer;
 import org.apache.camel.test.infra.messaging.services.MessagingService;
-import org.apache.camel.test.infra.messaging.services.MessagingServiceBuilder;
+import org.apache.camel.test.infra.messaging.services.MessagingServiceFactory;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -44,9 +43,9 @@ import static org.junit.jupiter.api.Assertions.fail;
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CamelSourceJMSWithAggregation extends CamelSourceTestSupport {
     @RegisterExtension
-    public static MessagingService jmsService = MessagingServiceBuilder
-            .newBuilder(DispatchRouterContainer::new)
-            .withEndpointProvider(DispatchRouterContainer::defaultEndpoint)
+    public static MessagingService jmsService = MessagingServiceFactory
+            .builder()
+            .addLocalMapping(SJMS2Common::createLocalService)
             .build();
 
     private final int sentSize = 10;