You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/05/25 06:10:51 UTC

[pulsar] branch branch-2.9 updated: [Flaky-test] BatchSourceExecutorTest.testLifeCycle (#10870) (#14717)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new e1b1c34d1fd [Flaky-test] BatchSourceExecutorTest.testLifeCycle (#10870) (#14717)
e1b1c34d1fd is described below

commit e1b1c34d1fd2a80a189c0db5e86d2888a8c718a9
Author: wuxuanqicn <89...@users.noreply.github.com>
AuthorDate: Fri Apr 22 01:46:22 2022 +0800

    [Flaky-test] BatchSourceExecutorTest.testLifeCycle (#10870) (#14717)
    
    ### Motivation
    
    event will be dropped at BatchSourceExecutor#triggerDiscover when discoverInProgress is true
    
    https://github.com/apache/pulsar/blob/f0d166f36e1fbd4df1e20ae2ccc7fcae822c17b4/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java#L161-L182
    
    ### Modifications
    
    await util task submitted in BatchSourceExecutor#triggerDiscover execute completed and discoverInProgress update to false
    
    Co-authored-by: xuanqi.wu <xu...@weimob.com>
    
    (cherry picked from commit be13b2503a5bb47d672587c38408436c840f349e)
---
 pulsar-functions/instance/pom.xml                             |  6 ++++++
 .../functions/source/batch/BatchSourceExecutorTest.java       | 11 ++++++++++-
 2 files changed, 16 insertions(+), 1 deletion(-)

diff --git a/pulsar-functions/instance/pom.xml b/pulsar-functions/instance/pom.xml
index efbb8fb6229..d5d37d32556 100644
--- a/pulsar-functions/instance/pom.xml
+++ b/pulsar-functions/instance/pom.xml
@@ -194,6 +194,12 @@
       <version>${prometheus-jmx.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java
index ff15a10ebbe..6715b74624b 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.functions.source.batch;
 
 
+import static org.awaitility.Awaitility.await;
+import static org.testng.Assert.fail;
 import com.google.gson.Gson;
 import lombok.Getter;
 import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -44,7 +46,6 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Consumer;
-import static org.testng.Assert.fail;
 
 /**
  * Unit tests for {@link org.apache.pulsar.functions.source.batch.BatchSourceExecutor}
@@ -368,6 +369,8 @@ public class BatchSourceExecutorTest {
     }
     Assert.assertEquals(testBatchSource.getRecordCount(), 6);
     Assert.assertEquals(testBatchSource.getDiscoverCount(), 1);
+
+    awaitDiscoverNotInProgress();
     triggerQueue.put("trigger");
     completedQueue.take();
     Assert.assertTrue(testBatchSource.getDiscoverCount() == 2);
@@ -387,6 +390,8 @@ public class BatchSourceExecutorTest {
     }
     Assert.assertEquals(testBatchPushSource.getRecordCount(), 5);
     Assert.assertEquals(testBatchPushSource.getDiscoverCount(), 1);
+
+    awaitDiscoverNotInProgress();
     triggerQueue.put("trigger");
     completedQueue.take();
     Assert.assertEquals(testBatchPushSource.getDiscoverCount(), 2);
@@ -406,4 +411,8 @@ public class BatchSourceExecutorTest {
     fail("should have thrown an exception");
   }
 
+  private void awaitDiscoverNotInProgress() {
+    await().until(() -> !batchSourceExecutor.discoverInProgress);
+  }
+
 }
\ No newline at end of file