You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2020/10/05 12:15:20 UTC
[camel-kafka-connector] branch master updated: Throttles strimzi
startup to prevent resource reuse errors
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 996898b Throttles strimzi startup to prevent resource reuse errors
996898b is described below
commit 996898b322ee393073922f7185253a365dd3bf1e
Author: Otavio R. Piske <an...@gmail.com>
AuthorDate: Sat Oct 3 14:18:53 2020 +0200
Throttles strimzi startup to prevent resource reuse errors
---
.../common/services/kafka/StrimziContainer.java | 18 ++++--------
.../common/services/kafka/StrimziService.java | 32 ++++++++++++++++++----
.../common/services/kafka/ZookeeperContainer.java | 15 ++++------
3 files changed, 38 insertions(+), 27 deletions(-)
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziContainer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziContainer.java
index 7174547..e877a7e 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziContainer.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziContainer.java
@@ -17,9 +17,6 @@
package org.apache.camel.kafkaconnector.common.services.kafka;
-import java.util.function.Consumer;
-
-import com.github.dockerjava.api.command.CreateContainerCmd;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
@@ -28,24 +25,21 @@ public class StrimziContainer extends GenericContainer<StrimziContainer> {
private static final String STRIMZI_CONTAINER = System.getProperty("itest.strimzi.container.image");
private static final int KAFKA_PORT = 9092;
- public StrimziContainer(Network network, String name) {
+ public StrimziContainer(Network network, String name, String zookeeperInstanceName) {
super(STRIMZI_CONTAINER);
withEnv("LOG_DIR", "/tmp/logs");
withExposedPorts(KAFKA_PORT);
withEnv("KAFKA_ADVERTISED_LISTENERS", String.format("PLAINTEXT://%s:9092", getContainerIpAddress()));
withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9092");
- withEnv("KAFKA_ZOOKEEPER_CONNECT", "zookeeper:2181");
+ withEnv("KAFKA_ZOOKEEPER_CONNECT", zookeeperInstanceName + ":2181");
withNetwork(network);
withCreateContainerCmdModifier(
- new Consumer<CreateContainerCmd>() {
- @Override
- public void accept(CreateContainerCmd createContainerCmd) {
- createContainerCmd.withHostName(name);
- createContainerCmd.withName(name);
- }
- }
+ createContainerCmd -> {
+ createContainerCmd.withHostName(name);
+ createContainerCmd.withName(name);
+ }
);
withCommand("sh", "-c",
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziService.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziService.java
index ca10c18..3ee5f9c 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziService.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziService.java
@@ -26,6 +26,8 @@ public class StrimziService implements KafkaService {
private static final Logger LOG = LoggerFactory.getLogger(StrimziService.class);
private static ZookeeperContainer zookeeperContainer;
private static StrimziContainer strimziContainer;
+ private static String zookeeperInstanceName;
+ private static String strimziInstanceName;
public StrimziService() {
@@ -33,12 +35,15 @@ public class StrimziService implements KafkaService {
Network network = Network.newNetwork();
+
if (zookeeperContainer == null) {
- zookeeperContainer = new ZookeeperContainer(network, "zookeeper");
+ zookeeperInstanceName = "zookeeper-" + TestUtils.randomWithRange(1, 100);
+ zookeeperContainer = new ZookeeperContainer(network, zookeeperInstanceName);
}
if (strimziContainer == null) {
- strimziContainer = new StrimziContainer(network, "strimzi");
+ strimziInstanceName = "strimzi-" + TestUtils.randomWithRange(1, 100);
+ strimziContainer = new StrimziContainer(network, strimziInstanceName, zookeeperInstanceName);
}
}
}
@@ -55,10 +60,18 @@ public class StrimziService implements KafkaService {
@Override
public void initialize() {
if (!zookeeperContainer.isRunning()) {
+ /*
+ When running multiple tests at once, this throttles the startup to give
+ time for docker to fully shutdown previously running instances (which
+ happens asynchronously). This prevents problems with false positive errors
+ such as docker complaining of multiple containers with the same name or
+ trying to reuse port numbers too quickly.
+ */
+ throttle();
zookeeperContainer.start();
}
- String zookeeperConnect = "zookeeper:" + zookeeperContainer.getZookeeperPort();
+ String zookeeperConnect = zookeeperInstanceName + ":" + zookeeperContainer.getZookeeperPort();
LOG.info("Apache Zookeeper running at address {}", zookeeperConnect);
if (!strimziContainer.isRunning()) {
@@ -68,6 +81,15 @@ public class StrimziService implements KafkaService {
LOG.info("Kafka bootstrap server running at address {}", getBootstrapServers());
}
+ private void throttle() {
+ try {
+ String throttleDelay = System.getProperty("itest.strimzi.throttle.delay", "10000");
+ Thread.sleep(Integer.parseInt(throttleDelay));
+ } catch (InterruptedException e) {
+ LOG.warn("Strimzi startup interrupted");
+ }
+ }
+
private boolean stopped() {
return !strimziContainer.isRunning() && !zookeeperContainer.isRunning();
}
@@ -77,11 +99,11 @@ public class StrimziService implements KafkaService {
try {
LOG.info("Stopping Kafka container");
strimziContainer.stop();
-
- TestUtils.waitFor(this::stopped);
} finally {
LOG.info("Stopping Zookeeper container");
zookeeperContainer.stop();
+
+ TestUtils.waitFor(this::stopped);
}
}
}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/ZookeeperContainer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/ZookeeperContainer.java
index 7b1ac04..4307e38 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/ZookeeperContainer.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/ZookeeperContainer.java
@@ -17,9 +17,6 @@
package org.apache.camel.kafkaconnector.common.services.kafka;
-import java.util.function.Consumer;
-
-import com.github.dockerjava.api.command.CreateContainerCmd;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
@@ -34,14 +31,12 @@ public class ZookeeperContainer extends GenericContainer<ZookeeperContainer> {
withEnv("LOG_DIR", "/tmp/logs");
withExposedPorts(ZOOKEEPER_PORT);
withNetwork(network);
+
withCreateContainerCmdModifier(
- new Consumer<CreateContainerCmd>() {
- @Override
- public void accept(CreateContainerCmd createContainerCmd) {
- createContainerCmd.withHostName(name);
- createContainerCmd.withName(name);
- }
- }
+ createContainerCmd -> {
+ createContainerCmd.withHostName(name);
+ createContainerCmd.withName(name);
+ }
);
withCommand("sh", "-c",