You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/01/13 06:09:39 UTC
[camel-kafka-connector] 01/02: Ensure the system resources are
correctly released
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 8a089d06d7f921c20bdee2ce10d70dff67b07780
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Tue Jan 12 10:53:50 2021 +0100
Ensure the system resources are correctly released
---
.../common/services/kafka/KafkaServiceFactory.java | 6 +-
.../common/services/kafka/StrimziService.java | 103 +++++++++++++++++++++
2 files changed, 108 insertions(+), 1 deletion(-)
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/KafkaServiceFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/KafkaServiceFactory.java
index c1b51c4..052ba9c 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/KafkaServiceFactory.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/KafkaServiceFactory.java
@@ -35,7 +35,11 @@ public final class KafkaServiceFactory {
return new EmbeddedKafkaService();
}
+ if (kafkaInstanceType.equals("local-strimzi-container")) {
+ return new StrimziService();
+ }
+
return org.apache.camel.test.infra.kafka.services.KafkaServiceFactory.createService();
- }
+ }
}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziService.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziService.java
new file mode 100644
index 0000000..6f3b03e
--- /dev/null
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziService.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.common.services.kafka;
+
+import org.apache.camel.test.infra.common.TestUtils;
+import org.apache.camel.test.infra.common.services.ContainerService;
+import org.apache.camel.test.infra.kafka.common.KafkaProperties;
+import org.apache.camel.test.infra.kafka.services.KafkaService;
+import org.apache.camel.test.infra.kafka.services.StrimziContainer;
+import org.apache.camel.test.infra.kafka.services.ZookeeperContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+
+class StrimziService implements KafkaService, ContainerService<StrimziContainer> {
+ private static final Logger LOG = LoggerFactory.getLogger(StrimziService.class);
+
+ private final ZookeeperContainer zookeeperContainer;
+ private final StrimziContainer strimziContainer;
+
+ public StrimziService() {
+ Network network = Network.newNetwork();
+
+ String zookeeperInstanceName = "zookeeper-" + TestUtils.randomWithRange(1, 100);
+ zookeeperContainer = new ZookeeperContainer(network, zookeeperInstanceName);
+
+ String strimziInstanceName = "strimzi-" + TestUtils.randomWithRange(1, 100);
+ strimziContainer = new StrimziContainer(network, strimziInstanceName, zookeeperInstanceName);
+ }
+
+ public StrimziService(ZookeeperContainer zookeeperContainer, StrimziContainer strimziContainer) {
+ this.zookeeperContainer = zookeeperContainer;
+ this.strimziContainer = strimziContainer;
+ }
+
+ protected Integer getKafkaPort() {
+ return strimziContainer.getKafkaPort();
+ }
+
+ @Override
+ public String getBootstrapServers() {
+ return strimziContainer.getContainerIpAddress() + ":" + getKafkaPort();
+ }
+
+ @Override
+ public void registerProperties() {
+ System.setProperty(KafkaProperties.KAFKA_BOOTSTRAP_SERVERS, getBootstrapServers());
+ }
+
+ @Override
+ public void initialize() {
+ zookeeperContainer.start();
+
+ String zookeeperConnect = zookeeperContainer.getContainerIpAddress() + ":" + zookeeperContainer.getZookeeperPort();
+ LOG.info("Apache Zookeeper running at address {}", zookeeperConnect);
+
+ strimziContainer.start();
+
+ registerProperties();
+ LOG.info("Kafka bootstrap server running at address {}", getBootstrapServers());
+ }
+
+ private boolean stopped() {
+ return !strimziContainer.isRunning() && !zookeeperContainer.isRunning();
+ }
+
+ @Override
+ public void shutdown() {
+ try {
+ LOG.info("Stopping Kafka container");
+ strimziContainer.stop();
+ } finally {
+ LOG.info("Stopping Zookeeper container");
+ zookeeperContainer.stop();
+
+ TestUtils.waitFor(this::stopped);
+ }
+ }
+
+ @Override
+ public StrimziContainer getContainer() {
+ return strimziContainer;
+ }
+
+ protected ZookeeperContainer getZookeeperContainer() {
+ return zookeeperContainer;
+ }
+}