You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/04/21 17:46:29 UTC
[pulsar] branch master updated: [Flaky-test] BatchSourceExecutorTest.testLifeCycle (#10870) (#14717)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new be13b2503a5 [Flaky-test] BatchSourceExecutorTest.testLifeCycle (#10870) (#14717)
be13b2503a5 is described below
commit be13b2503a5bb47d672587c38408436c840f349e
Author: wuxuanqicn <89...@users.noreply.github.com>
AuthorDate: Fri Apr 22 01:46:22 2022 +0800
[Flaky-test] BatchSourceExecutorTest.testLifeCycle (#10870) (#14717)
### Motivation
event will be dropped at BatchSourceExecutor#triggerDiscover when discoverInProgress is true
https://github.com/apache/pulsar/blob/f0d166f36e1fbd4df1e20ae2ccc7fcae822c17b4/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java#L161-L182
### Modifications
await util task submitted in BatchSourceExecutor#triggerDiscover execute completed and discoverInProgress update to false
Co-authored-by: xuanqi.wu <xu...@weimob.com>
---
pulsar-functions/instance/pom.xml | 6 ++++++
.../pulsar/functions/source/batch/BatchSourceExecutorTest.java | 9 +++++++++
2 files changed, 15 insertions(+)
diff --git a/pulsar-functions/instance/pom.xml b/pulsar-functions/instance/pom.xml
index 500a4815798..04524d9d714 100644
--- a/pulsar-functions/instance/pom.xml
+++ b/pulsar-functions/instance/pom.xml
@@ -194,6 +194,12 @@
<version>${prometheus-jmx.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java
index c0ceab02e36..4432a0f2e7e 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.functions.source.batch;
+import static org.awaitility.Awaitility.await;
import static org.testng.Assert.fail;
import com.google.gson.Gson;
import java.util.HashMap;
@@ -378,6 +379,8 @@ public class BatchSourceExecutorTest {
}
Assert.assertEquals(testBatchSource.getRecordCount(), 6);
Assert.assertEquals(testBatchSource.getDiscoverCount(), 1);
+
+ awaitDiscoverNotInProgress();
triggerQueue.put("trigger");
completedQueue.take();
Assert.assertTrue(testBatchSource.getDiscoverCount() == 2);
@@ -397,6 +400,8 @@ public class BatchSourceExecutorTest {
}
Assert.assertEquals(testBatchPushSource.getRecordCount(), 5);
Assert.assertEquals(testBatchPushSource.getDiscoverCount(), 1);
+
+ awaitDiscoverNotInProgress();
triggerQueue.put("trigger");
completedQueue.take();
Assert.assertEquals(testBatchPushSource.getDiscoverCount(), 2);
@@ -416,4 +421,8 @@ public class BatchSourceExecutorTest {
fail("should have thrown an exception");
}
+ private void awaitDiscoverNotInProgress() {
+ await().until(() -> !batchSourceExecutor.discoverInProgress);
+ }
+
}