You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by cb...@apache.org on 2022/12/08 11:43:15 UTC

[pulsar-client-reactive] branch main updated: Fix flaky ReactiveMessagePipelineTest.handlingTimeout (#108)

This is an automated email from the ASF dual-hosted git repository.

cbornet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new 099549b  Fix flaky ReactiveMessagePipelineTest.handlingTimeout (#108)
099549b is described below

commit 099549bde39db84e73b59e93ff18599a87f3af29
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Dec 8 13:43:09 2022 +0200

    Fix flaky ReactiveMessagePipelineTest.handlingTimeout (#108)
---
 .../client/api/ReactiveMessagePipelineTest.java        | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)

diff --git a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTest.java b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTest.java
index 05e8ba8..78e5bbb 100644
--- a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTest.java
+++ b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTest.java
@@ -189,7 +189,8 @@ class ReactiveMessagePipelineTest {
 	void handlingTimeout() throws Exception {
 		int numMessages = 10;
 		TestConsumer testConsumer = new TestConsumer(numMessages);
-		CountDownLatch latch = new CountDownLatch(numMessages);
+		CountDownLatch latch = new CountDownLatch(1);
+		testConsumer.setFinishedCallback(latch::countDown);
 		AtomicReference<MessageId> timedoutMessageId = new AtomicReference<>();
 		Function<Message<String>, Publisher<Void>> messageHandler = (message) -> Mono.defer(() -> {
 			Duration delay;
@@ -200,13 +201,12 @@ class ReactiveMessagePipelineTest {
 			else {
 				delay = Duration.ofMillis(2);
 			}
-			return Mono.delay(delay).doFinally((__) -> latch.countDown()).then();
+			return Mono.delay(delay).then();
 		});
 		try (ReactiveMessagePipeline pipeline = testConsumer.messagePipeline().messageHandler(messageHandler)
 				.handlingTimeout(Duration.ofMillis(5)).build()) {
 			pipeline.start();
 			assertThat(latch.await(1, TimeUnit.SECONDS)).isTrue();
-			pipeline.stop();
 			// 9 messages should have been acked
 			assertThat(testConsumer.getAcknowledgedMessages()).hasSize(9);
 			// 1 message should have been nacked
@@ -425,6 +425,8 @@ class ReactiveMessagePipelineTest {
 
 		private final int numMessages;
 
+		private volatile Runnable finishedCallback;
+
 		TestConsumer(int numMessages) {
 			this.numMessages = numMessages;
 		}
@@ -433,6 +435,10 @@ class ReactiveMessagePipelineTest {
 
 		private final List<MessageId> nackedMessages = new CopyOnWriteArrayList<>();
 
+		void setFinishedCallback(Runnable finishedCallback) {
+			this.finishedCallback = finishedCallback;
+		}
+
 		@Override
 		public <R> Flux<R> consumeMany(Function<Flux<Message<String>>, Publisher<MessageResult<R>>> messageHandler) {
 			return Flux.defer(() -> {
@@ -445,7 +451,11 @@ class ReactiveMessagePipelineTest {
 					else {
 						this.nackedMessages.add(result.getMessageId());
 					}
-				}).mapNotNull(MessageResult::getValue);
+				}).mapNotNull(MessageResult::getValue).doFinally((__) -> {
+					if (this.finishedCallback != null) {
+						this.finishedCallback.run();
+					}
+				});
 			});
 		}