You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/05/25 06:10:51 UTC
[pulsar] branch branch-2.9 updated: [Flaky-test] BatchSourceExecutorTest.testLifeCycle (#10870) (#14717)
This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new e1b1c34d1fd [Flaky-test] BatchSourceExecutorTest.testLifeCycle (#10870) (#14717)
e1b1c34d1fd is described below
commit e1b1c34d1fd2a80a189c0db5e86d2888a8c718a9
Author: wuxuanqicn <89...@users.noreply.github.com>
AuthorDate: Fri Apr 22 01:46:22 2022 +0800
[Flaky-test] BatchSourceExecutorTest.testLifeCycle (#10870) (#14717)
### Motivation
event will be dropped at BatchSourceExecutor#triggerDiscover when discoverInProgress is true
https://github.com/apache/pulsar/blob/f0d166f36e1fbd4df1e20ae2ccc7fcae822c17b4/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java#L161-L182
### Modifications
await util task submitted in BatchSourceExecutor#triggerDiscover execute completed and discoverInProgress update to false
Co-authored-by: xuanqi.wu <xu...@weimob.com>
(cherry picked from commit be13b2503a5bb47d672587c38408436c840f349e)
---
pulsar-functions/instance/pom.xml | 6 ++++++
.../functions/source/batch/BatchSourceExecutorTest.java | 11 ++++++++++-
2 files changed, 16 insertions(+), 1 deletion(-)
diff --git a/pulsar-functions/instance/pom.xml b/pulsar-functions/instance/pom.xml
index efbb8fb6229..d5d37d32556 100644
--- a/pulsar-functions/instance/pom.xml
+++ b/pulsar-functions/instance/pom.xml
@@ -194,6 +194,12 @@
<version>${prometheus-jmx.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java
index ff15a10ebbe..6715b74624b 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.functions.source.batch;
+import static org.awaitility.Awaitility.await;
+import static org.testng.Assert.fail;
import com.google.gson.Gson;
import lombok.Getter;
import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -44,7 +46,6 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
-import static org.testng.Assert.fail;
/**
* Unit tests for {@link org.apache.pulsar.functions.source.batch.BatchSourceExecutor}
@@ -368,6 +369,8 @@ public class BatchSourceExecutorTest {
}
Assert.assertEquals(testBatchSource.getRecordCount(), 6);
Assert.assertEquals(testBatchSource.getDiscoverCount(), 1);
+
+ awaitDiscoverNotInProgress();
triggerQueue.put("trigger");
completedQueue.take();
Assert.assertTrue(testBatchSource.getDiscoverCount() == 2);
@@ -387,6 +390,8 @@ public class BatchSourceExecutorTest {
}
Assert.assertEquals(testBatchPushSource.getRecordCount(), 5);
Assert.assertEquals(testBatchPushSource.getDiscoverCount(), 1);
+
+ awaitDiscoverNotInProgress();
triggerQueue.put("trigger");
completedQueue.take();
Assert.assertEquals(testBatchPushSource.getDiscoverCount(), 2);
@@ -406,4 +411,8 @@ public class BatchSourceExecutorTest {
fail("should have thrown an exception");
}
+ private void awaitDiscoverNotInProgress() {
+ await().until(() -> !batchSourceExecutor.discoverInProgress);
+ }
+
}
\ No newline at end of file