You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2021/12/22 15:59:52 UTC

[pulsar] 01/03: Flaky tests: ElasticSearchClientTests tests time out (#12694)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8435cfe95ee03aa115b065d2b132e6d2a9a8f49d
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Fri Nov 19 17:13:10 2021 +0100

    Flaky tests: ElasticSearchClientTests tests time out (#12694)
    
    (cherry picked from commit dc48d29dc5d37793b0b2e6da28e7ba9b79bd4e49)
---
 .../io/elasticsearch/ElasticSearchClientTests.java |  6 +--
 .../testcontainers/ChaosContainer.java             | 61 +++++++++++++++++-----
 2 files changed, 52 insertions(+), 15 deletions(-)

diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
index aeacaf8..fb927c5 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
@@ -201,7 +201,7 @@ public class ElasticSearchClientTests {
                 assertEquals(mockRecord.failed, 0);
                 assertEquals(client.totalHits(index), 2);
 
-                ChaosContainer<?> chaosContainer = new ChaosContainer<>(container.getContainerName(), "15s");
+                ChaosContainer<?> chaosContainer = ChaosContainer.pauseContainerForSeconds(container.getContainerName(), 15);
                 chaosContainer.start();
 
                 client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}"));
@@ -248,12 +248,12 @@ public class ElasticSearchClientTests {
                 });
                 client.flush();
                 Awaitility.await().untilAsserted(() -> {
-                    assertEquals(mockRecord.acked, 5);
                     assertEquals(mockRecord.failed, 0);
+                    assertEquals(mockRecord.acked, 5);
                     assertEquals(client.totalHits(index), 5);
                 });
 
-                ChaosContainer<?> chaosContainer = new ChaosContainer<>(container.getContainerName(), "30s");
+                ChaosContainer<?> chaosContainer = ChaosContainer.pauseContainerForSeconds(container.getContainerName(), 30);
                 chaosContainer.start();
                 Thread.sleep(1000L);
 
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/testcontainers/ChaosContainer.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/testcontainers/ChaosContainer.java
index 7e7734f..4b296bb 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/testcontainers/ChaosContainer.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/testcontainers/ChaosContainer.java
@@ -19,26 +19,63 @@
 package org.apache.pulsar.io.elasticsearch.testcontainers;
 
 import lombok.extern.slf4j.Slf4j;
+import org.awaitility.Awaitility;
 import org.testcontainers.containers.BindMode;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.containers.wait.strategy.WaitStrategy;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
 
 // see https://github.com/alexei-led/pumba
 @Slf4j
 public class ChaosContainer<SELF extends ChaosContainer<SELF>> extends GenericContainer<SELF> {
 
-  public static final String PUMBA_IMAGE = Optional.ofNullable(System.getenv("PUMBA_IMAGE"))
-          .orElse("gaiaadm/pumba:latest");
-
-  public ChaosContainer(String targetContainer, String pause) {
-    super(PUMBA_IMAGE);
-    setCommand("--log-level info --interval 60s pause --duration " + pause + " " + targetContainer);
-    addFileSystemBind("/var/run/docker.sock", "/var/run/docker.sock", BindMode.READ_WRITE);
-    setWaitStrategy(Wait.forLogMessage(".*pausing container.*", 1));
-    withLogConsumer(o -> {
-      log.info("pumba> {}", o.getUtf8String());
-    });
-  }
+    public static final String PUMBA_IMAGE = Optional.ofNullable(System.getenv("PUMBA_IMAGE"))
+            .orElse("gaiaadm/pumba:0.8.0");
+
+    private final List<String> logs = new ArrayList<>();
+    private Consumer<ChaosContainer> beforeStop;
+
+    public static ChaosContainer pauseContainerForSeconds(String targetContainer, int seconds) {
+        return new ChaosContainer(targetContainer, "pause --duration " + seconds + "s", Wait.forLogMessage(".*pausing container.*", 1),
+                (Consumer<ChaosContainer>) chaosContainer -> Awaitility
+                        .await()
+                        .atMost(seconds + 5, TimeUnit.SECONDS)
+                        .until(() -> {
+                                    boolean found = chaosContainer.logs.stream().anyMatch((Predicate<String>) line -> line.contains("stop pausing container"));
+                                    if (!found) {
+                                        log.debug("ChaosContainer stop requested. waiting for \"stop pausing container\" log");
+                                        log.debug(String.join("\n", chaosContainer.logs));
+                                    }
+                                    return found;
+                                }
+                        ));
+    }
+
+    private ChaosContainer(String targetContainer, String command, WaitStrategy waitStrategy, Consumer<ChaosContainer> beforeStop) {
+        super(PUMBA_IMAGE);
+        setCommand("--log-level info " + command + " " + targetContainer);
+        addFileSystemBind("/var/run/docker.sock", "/var/run/docker.sock", BindMode.READ_WRITE);
+        setWaitStrategy(waitStrategy);
+        withLogConsumer(o -> {
+            final String string = o.getUtf8String();
+            log.info("pumba> {}", string);
+            logs.add(string);
+        });
+        this.beforeStop = beforeStop;
+    }
+
+    @Override
+    public void stop() {
+        if (getContainerId() != null && beforeStop != null) {
+            beforeStop.accept(this);
+        }
+        super.stop();
+    }
 }
\ No newline at end of file