You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2021/12/22 15:59:52 UTC
[pulsar] 01/03: Flaky tests: ElasticSearchClientTests tests time out (#12694)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8435cfe95ee03aa115b065d2b132e6d2a9a8f49d
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Fri Nov 19 17:13:10 2021 +0100
Flaky tests: ElasticSearchClientTests tests time out (#12694)
(cherry picked from commit dc48d29dc5d37793b0b2e6da28e7ba9b79bd4e49)
---
.../io/elasticsearch/ElasticSearchClientTests.java | 6 +--
.../testcontainers/ChaosContainer.java | 61 +++++++++++++++++-----
2 files changed, 52 insertions(+), 15 deletions(-)
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
index aeacaf8..fb927c5 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
@@ -201,7 +201,7 @@ public class ElasticSearchClientTests {
assertEquals(mockRecord.failed, 0);
assertEquals(client.totalHits(index), 2);
- ChaosContainer<?> chaosContainer = new ChaosContainer<>(container.getContainerName(), "15s");
+ ChaosContainer<?> chaosContainer = ChaosContainer.pauseContainerForSeconds(container.getContainerName(), 15);
chaosContainer.start();
client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}"));
@@ -248,12 +248,12 @@ public class ElasticSearchClientTests {
});
client.flush();
Awaitility.await().untilAsserted(() -> {
- assertEquals(mockRecord.acked, 5);
assertEquals(mockRecord.failed, 0);
+ assertEquals(mockRecord.acked, 5);
assertEquals(client.totalHits(index), 5);
});
- ChaosContainer<?> chaosContainer = new ChaosContainer<>(container.getContainerName(), "30s");
+ ChaosContainer<?> chaosContainer = ChaosContainer.pauseContainerForSeconds(container.getContainerName(), 30);
chaosContainer.start();
Thread.sleep(1000L);
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/testcontainers/ChaosContainer.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/testcontainers/ChaosContainer.java
index 7e7734f..4b296bb 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/testcontainers/ChaosContainer.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/testcontainers/ChaosContainer.java
@@ -19,26 +19,63 @@
package org.apache.pulsar.io.elasticsearch.testcontainers;
import lombok.extern.slf4j.Slf4j;
+import org.awaitility.Awaitility;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.containers.wait.strategy.WaitStrategy;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
// see https://github.com/alexei-led/pumba
@Slf4j
public class ChaosContainer<SELF extends ChaosContainer<SELF>> extends GenericContainer<SELF> {
- public static final String PUMBA_IMAGE = Optional.ofNullable(System.getenv("PUMBA_IMAGE"))
- .orElse("gaiaadm/pumba:latest");
-
- public ChaosContainer(String targetContainer, String pause) {
- super(PUMBA_IMAGE);
- setCommand("--log-level info --interval 60s pause --duration " + pause + " " + targetContainer);
- addFileSystemBind("/var/run/docker.sock", "/var/run/docker.sock", BindMode.READ_WRITE);
- setWaitStrategy(Wait.forLogMessage(".*pausing container.*", 1));
- withLogConsumer(o -> {
- log.info("pumba> {}", o.getUtf8String());
- });
- }
+ public static final String PUMBA_IMAGE = Optional.ofNullable(System.getenv("PUMBA_IMAGE"))
+ .orElse("gaiaadm/pumba:0.8.0");
+
+ private final List<String> logs = new ArrayList<>();
+ private Consumer<ChaosContainer> beforeStop;
+
+ public static ChaosContainer pauseContainerForSeconds(String targetContainer, int seconds) {
+ return new ChaosContainer(targetContainer, "pause --duration " + seconds + "s", Wait.forLogMessage(".*pausing container.*", 1),
+ (Consumer<ChaosContainer>) chaosContainer -> Awaitility
+ .await()
+ .atMost(seconds + 5, TimeUnit.SECONDS)
+ .until(() -> {
+ boolean found = chaosContainer.logs.stream().anyMatch((Predicate<String>) line -> line.contains("stop pausing container"));
+ if (!found) {
+ log.debug("ChaosContainer stop requested. waiting for \"stop pausing container\" log");
+ log.debug(String.join("\n", chaosContainer.logs));
+ }
+ return found;
+ }
+ ));
+ }
+
+ private ChaosContainer(String targetContainer, String command, WaitStrategy waitStrategy, Consumer<ChaosContainer> beforeStop) {
+ super(PUMBA_IMAGE);
+ setCommand("--log-level info " + command + " " + targetContainer);
+ addFileSystemBind("/var/run/docker.sock", "/var/run/docker.sock", BindMode.READ_WRITE);
+ setWaitStrategy(waitStrategy);
+ withLogConsumer(o -> {
+ final String string = o.getUtf8String();
+ log.info("pumba> {}", string);
+ logs.add(string);
+ });
+ this.beforeStop = beforeStop;
+ }
+
+ @Override
+ public void stop() {
+ if (getContainerId() != null && beforeStop != null) {
+ beforeStop.accept(this);
+ }
+ super.stop();
+ }
}
\ No newline at end of file