You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/04/21 17:46:29 UTC

[pulsar] branch master updated: [Flaky-test] BatchSourceExecutorTest.testLifeCycle (#10870) (#14717)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new be13b2503a5 [Flaky-test] BatchSourceExecutorTest.testLifeCycle (#10870) (#14717)
be13b2503a5 is described below

commit be13b2503a5bb47d672587c38408436c840f349e
Author: wuxuanqicn <89...@users.noreply.github.com>
AuthorDate: Fri Apr 22 01:46:22 2022 +0800

    [Flaky-test] BatchSourceExecutorTest.testLifeCycle (#10870) (#14717)
    
    ### Motivation
    
    event will be dropped at BatchSourceExecutor#triggerDiscover when discoverInProgress is true
    
    https://github.com/apache/pulsar/blob/f0d166f36e1fbd4df1e20ae2ccc7fcae822c17b4/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java#L161-L182
    
    ### Modifications
    
    await util task submitted in BatchSourceExecutor#triggerDiscover execute completed and discoverInProgress update to false
    
    Co-authored-by: xuanqi.wu <xu...@weimob.com>
---
 pulsar-functions/instance/pom.xml                                | 6 ++++++
 .../pulsar/functions/source/batch/BatchSourceExecutorTest.java   | 9 +++++++++
 2 files changed, 15 insertions(+)

diff --git a/pulsar-functions/instance/pom.xml b/pulsar-functions/instance/pom.xml
index 500a4815798..04524d9d714 100644
--- a/pulsar-functions/instance/pom.xml
+++ b/pulsar-functions/instance/pom.xml
@@ -194,6 +194,12 @@
       <version>${prometheus-jmx.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java
index c0ceab02e36..4432a0f2e7e 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions.source.batch;
 
 
+import static org.awaitility.Awaitility.await;
 import static org.testng.Assert.fail;
 import com.google.gson.Gson;
 import java.util.HashMap;
@@ -378,6 +379,8 @@ public class BatchSourceExecutorTest {
     }
     Assert.assertEquals(testBatchSource.getRecordCount(), 6);
     Assert.assertEquals(testBatchSource.getDiscoverCount(), 1);
+
+    awaitDiscoverNotInProgress();
     triggerQueue.put("trigger");
     completedQueue.take();
     Assert.assertTrue(testBatchSource.getDiscoverCount() == 2);
@@ -397,6 +400,8 @@ public class BatchSourceExecutorTest {
     }
     Assert.assertEquals(testBatchPushSource.getRecordCount(), 5);
     Assert.assertEquals(testBatchPushSource.getDiscoverCount(), 1);
+
+    awaitDiscoverNotInProgress();
     triggerQueue.put("trigger");
     completedQueue.take();
     Assert.assertEquals(testBatchPushSource.getDiscoverCount(), 2);
@@ -416,4 +421,8 @@ public class BatchSourceExecutorTest {
     fail("should have thrown an exception");
   }
 
+  private void awaitDiscoverNotInProgress() {
+    await().until(() -> !batchSourceExecutor.discoverInProgress);
+  }
+
 }