You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2020/10/05 12:15:20 UTC

[camel-kafka-connector] branch master updated: Throttles strimzi startup to prevent resource reuse errors

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 996898b  Throttles strimzi startup to prevent resource reuse errors
996898b is described below

commit 996898b322ee393073922f7185253a365dd3bf1e
Author: Otavio R. Piske <an...@gmail.com>
AuthorDate: Sat Oct 3 14:18:53 2020 +0200

    Throttles strimzi startup to prevent resource reuse errors
---
 .../common/services/kafka/StrimziContainer.java    | 18 ++++--------
 .../common/services/kafka/StrimziService.java      | 32 ++++++++++++++++++----
 .../common/services/kafka/ZookeeperContainer.java  | 15 ++++------
 3 files changed, 38 insertions(+), 27 deletions(-)

diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziContainer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziContainer.java
index 7174547..e877a7e 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziContainer.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziContainer.java
@@ -17,9 +17,6 @@
 
 package org.apache.camel.kafkaconnector.common.services.kafka;
 
-import java.util.function.Consumer;
-
-import com.github.dockerjava.api.command.CreateContainerCmd;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.Network;
 import org.testcontainers.containers.wait.strategy.Wait;
@@ -28,24 +25,21 @@ public class StrimziContainer extends GenericContainer<StrimziContainer> {
     private static final String STRIMZI_CONTAINER = System.getProperty("itest.strimzi.container.image");
     private static final int KAFKA_PORT = 9092;
 
-    public StrimziContainer(Network network, String name) {
+    public StrimziContainer(Network network, String name, String zookeeperInstanceName) {
         super(STRIMZI_CONTAINER);
 
         withEnv("LOG_DIR", "/tmp/logs");
         withExposedPorts(KAFKA_PORT);
         withEnv("KAFKA_ADVERTISED_LISTENERS", String.format("PLAINTEXT://%s:9092", getContainerIpAddress()));
         withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9092");
-        withEnv("KAFKA_ZOOKEEPER_CONNECT", "zookeeper:2181");
+        withEnv("KAFKA_ZOOKEEPER_CONNECT", zookeeperInstanceName + ":2181");
         withNetwork(network);
 
         withCreateContainerCmdModifier(
-                new Consumer<CreateContainerCmd>() {
-                    @Override
-                    public void accept(CreateContainerCmd createContainerCmd) {
-                        createContainerCmd.withHostName(name);
-                        createContainerCmd.withName(name);
-                    }
-                }
+            createContainerCmd -> {
+                createContainerCmd.withHostName(name);
+                createContainerCmd.withName(name);
+            }
         );
 
         withCommand("sh", "-c",
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziService.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziService.java
index ca10c18..3ee5f9c 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziService.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziService.java
@@ -26,6 +26,8 @@ public class StrimziService implements KafkaService {
     private static final Logger LOG = LoggerFactory.getLogger(StrimziService.class);
     private static ZookeeperContainer zookeeperContainer;
     private static StrimziContainer strimziContainer;
+    private static String zookeeperInstanceName;
+    private static String strimziInstanceName;
 
     public StrimziService() {
 
@@ -33,12 +35,15 @@ public class StrimziService implements KafkaService {
 
             Network network = Network.newNetwork();
 
+
             if (zookeeperContainer == null) {
-                zookeeperContainer = new ZookeeperContainer(network, "zookeeper");
+                zookeeperInstanceName = "zookeeper-" + TestUtils.randomWithRange(1, 100);
+                zookeeperContainer = new ZookeeperContainer(network, zookeeperInstanceName);
             }
 
             if (strimziContainer == null) {
-                strimziContainer = new StrimziContainer(network, "strimzi");
+                strimziInstanceName = "strimzi-" + TestUtils.randomWithRange(1, 100);
+                strimziContainer = new StrimziContainer(network, strimziInstanceName, zookeeperInstanceName);
             }
         }
     }
@@ -55,10 +60,18 @@ public class StrimziService implements KafkaService {
     @Override
     public void initialize() {
         if (!zookeeperContainer.isRunning()) {
+            /*
+             When running multiple tests at once, this throttles the startup to give
+             time for docker to fully shutdown previously running instances (which
+             happens asynchronously). This prevents problems with false positive errors
+             such as docker complaining of multiple containers with the same name or
+             trying to reuse port numbers too quickly.
+             */
+            throttle();
             zookeeperContainer.start();
         }
 
-        String zookeeperConnect = "zookeeper:" + zookeeperContainer.getZookeeperPort();
+        String zookeeperConnect = zookeeperInstanceName + ":" + zookeeperContainer.getZookeeperPort();
         LOG.info("Apache Zookeeper running at address {}", zookeeperConnect);
 
         if (!strimziContainer.isRunning()) {
@@ -68,6 +81,15 @@ public class StrimziService implements KafkaService {
         LOG.info("Kafka bootstrap server running at address {}", getBootstrapServers());
     }
 
+    private void throttle() {
+        try {
+            String throttleDelay = System.getProperty("itest.strimzi.throttle.delay", "10000");
+            Thread.sleep(Integer.parseInt(throttleDelay));
+        } catch (InterruptedException e) {
+            LOG.warn("Strimzi startup interrupted");
+        }
+    }
+
     private boolean stopped() {
         return !strimziContainer.isRunning() && !zookeeperContainer.isRunning();
     }
@@ -77,11 +99,11 @@ public class StrimziService implements KafkaService {
         try {
             LOG.info("Stopping Kafka container");
             strimziContainer.stop();
-
-            TestUtils.waitFor(this::stopped);
         } finally {
             LOG.info("Stopping Zookeeper container");
             zookeeperContainer.stop();
+
+            TestUtils.waitFor(this::stopped);
         }
     }
 }
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/ZookeeperContainer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/ZookeeperContainer.java
index 7b1ac04..4307e38 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/ZookeeperContainer.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/ZookeeperContainer.java
@@ -17,9 +17,6 @@
 
 package org.apache.camel.kafkaconnector.common.services.kafka;
 
-import java.util.function.Consumer;
-
-import com.github.dockerjava.api.command.CreateContainerCmd;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.Network;
 import org.testcontainers.containers.wait.strategy.Wait;
@@ -34,14 +31,12 @@ public class ZookeeperContainer extends GenericContainer<ZookeeperContainer> {
         withEnv("LOG_DIR", "/tmp/logs");
         withExposedPorts(ZOOKEEPER_PORT);
         withNetwork(network);
+
         withCreateContainerCmdModifier(
-                new Consumer<CreateContainerCmd>() {
-                    @Override
-                    public void accept(CreateContainerCmd createContainerCmd) {
-                        createContainerCmd.withHostName(name);
-                        createContainerCmd.withName(name);
-                    }
-                }
+            createContainerCmd -> {
+                createContainerCmd.withHostName(name);
+                createContainerCmd.withName(name);
+            }
         );
 
         withCommand("sh", "-c",