You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by cb...@apache.org on 2022/12/08 11:43:15 UTC
[pulsar-client-reactive] branch main updated: Fix flaky ReactiveMessagePipelineTest.handlingTimeout (#108)
This is an automated email from the ASF dual-hosted git repository.
cbornet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push:
new 099549b Fix flaky ReactiveMessagePipelineTest.handlingTimeout (#108)
099549b is described below
commit 099549bde39db84e73b59e93ff18599a87f3af29
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Dec 8 13:43:09 2022 +0200
Fix flaky ReactiveMessagePipelineTest.handlingTimeout (#108)
---
.../client/api/ReactiveMessagePipelineTest.java | 18 ++++++++++++++----
1 file changed, 14 insertions(+), 4 deletions(-)
diff --git a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTest.java b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTest.java
index 05e8ba8..78e5bbb 100644
--- a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTest.java
+++ b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTest.java
@@ -189,7 +189,8 @@ class ReactiveMessagePipelineTest {
void handlingTimeout() throws Exception {
int numMessages = 10;
TestConsumer testConsumer = new TestConsumer(numMessages);
- CountDownLatch latch = new CountDownLatch(numMessages);
+ CountDownLatch latch = new CountDownLatch(1);
+ testConsumer.setFinishedCallback(latch::countDown);
AtomicReference<MessageId> timedoutMessageId = new AtomicReference<>();
Function<Message<String>, Publisher<Void>> messageHandler = (message) -> Mono.defer(() -> {
Duration delay;
@@ -200,13 +201,12 @@ class ReactiveMessagePipelineTest {
else {
delay = Duration.ofMillis(2);
}
- return Mono.delay(delay).doFinally((__) -> latch.countDown()).then();
+ return Mono.delay(delay).then();
});
try (ReactiveMessagePipeline pipeline = testConsumer.messagePipeline().messageHandler(messageHandler)
.handlingTimeout(Duration.ofMillis(5)).build()) {
pipeline.start();
assertThat(latch.await(1, TimeUnit.SECONDS)).isTrue();
- pipeline.stop();
// 9 messages should have been acked
assertThat(testConsumer.getAcknowledgedMessages()).hasSize(9);
// 1 message should have been nacked
@@ -425,6 +425,8 @@ class ReactiveMessagePipelineTest {
private final int numMessages;
+ private volatile Runnable finishedCallback;
+
TestConsumer(int numMessages) {
this.numMessages = numMessages;
}
@@ -433,6 +435,10 @@ class ReactiveMessagePipelineTest {
private final List<MessageId> nackedMessages = new CopyOnWriteArrayList<>();
+ void setFinishedCallback(Runnable finishedCallback) {
+ this.finishedCallback = finishedCallback;
+ }
+
@Override
public <R> Flux<R> consumeMany(Function<Flux<Message<String>>, Publisher<MessageResult<R>>> messageHandler) {
return Flux.defer(() -> {
@@ -445,7 +451,11 @@ class ReactiveMessagePipelineTest {
else {
this.nackedMessages.add(result.getMessageId());
}
- }).mapNotNull(MessageResult::getValue);
+ }).mapNotNull(MessageResult::getValue).doFinally((__) -> {
+ if (this.finishedCallback != null) {
+ this.finishedCallback.run();
+ }
+ });
});
}