You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/12/01 12:19:41 UTC

[pulsar-client-reactive] branch main updated: Fix InflightCounter used in ReactiveMessagePipelineTest (#69)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new a518e87  Fix InflightCounter used in ReactiveMessagePipelineTest (#69)
a518e87 is described below

commit a518e878a0a8aaeb81885a1c49a017d4e5d70e7c
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Dec 1 14:19:35 2022 +0200

    Fix InflightCounter used in ReactiveMessagePipelineTest (#69)
    
    * Fix InflightCounter used in ReactiveMessagePipelineTest
    
    - max.updateAndGet might get called multiple times when there are races
    
    * Fix flaky ReactiveMessagePipelineTest.handlingTimeout
    
    * Rename test method errorHandler -> errorLogger
---
 .../client/api/ReactiveMessagePipelineTest.java    | 91 ++++++++++++++--------
 1 file changed, 59 insertions(+), 32 deletions(-)

diff --git a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTest.java b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTest.java
index 5b55b17..ea8d548 100644
--- a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTest.java
+++ b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTest.java
@@ -17,18 +17,23 @@
 package org.apache.pulsar.reactive.client.api;
 
 import java.time.Duration;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -181,36 +186,35 @@ class ReactiveMessagePipelineTest {
 	void handlingTimeout() throws Exception {
 		int numMessages = 10;
 		TestConsumer testConsumer = new TestConsumer(numMessages);
-		CountDownLatch latch1 = new CountDownLatch(numMessages);
-		CountDownLatch latch2 = new CountDownLatch(numMessages);
-		Function<Message<String>, Publisher<Void>> messageHandler = (message) -> {
-			latch1.countDown();
-			return Mono.delay(Duration.ofMillis(2)).doOnNext((it) -> latch2.countDown()).then();
-		};
-		try (ReactiveMessagePipeline pipeline = testConsumer.messagePipeline().messageHandler(messageHandler).build()) {
-			pipeline.start();
-			assertThat(latch1.await(10000, TimeUnit.MILLISECONDS)).isTrue();
-			assertThat(latch2.await(10, TimeUnit.MILLISECONDS)).isTrue();
-		}
-
-		CountDownLatch latch3 = new CountDownLatch(numMessages);
-		CountDownLatch latch4 = new CountDownLatch(numMessages);
-		Function<Message<String>, Publisher<Void>> messageHandler2 = (message) -> {
-			latch3.countDown();
-			return Mono.delay(Duration.ofMillis(2)).doOnNext((it) -> latch4.countDown()).then();
-		};
-		try (ReactiveMessagePipeline pipeline = testConsumer.messagePipeline().messageHandler(messageHandler2)
-				.handlingTimeout(Duration.ofMillis(1)).build()) {
+		CountDownLatch latch = new CountDownLatch(numMessages);
+		AtomicReference<MessageId> timedoutMessageId = new AtomicReference<>();
+		Function<Message<String>, Publisher<Void>> messageHandler = (message) -> Mono.defer(() -> {
+			Duration delay;
+			if (message.getValue().equals("5")) {
+				delay = Duration.ofMillis(15);
+				timedoutMessageId.set(message.getMessageId());
+			}
+			else {
+				delay = Duration.ofMillis(2);
+			}
+			return Mono.delay(delay).doFinally((__) -> latch.countDown()).then();
+		});
+		try (ReactiveMessagePipeline pipeline = testConsumer.messagePipeline().messageHandler(messageHandler)
+				.handlingTimeout(Duration.ofMillis(5)).build()) {
 			pipeline.start();
-			assertThat(latch3.await(100, TimeUnit.MILLISECONDS)).isTrue();
-			assertThat(latch4.await(10, TimeUnit.MILLISECONDS)).isFalse();
-			assertThat(latch4.getCount()).isEqualTo(10);
+			assertThat(latch.await(1, TimeUnit.SECONDS)).isTrue();
+			// 9 messages should have been acked
+			assertThat(testConsumer.getAcknowledgedMessages()).hasSize(9);
+			// 1 message should have been nacked
+			assertThat(testConsumer.getNackedMessages()).hasSize(1);
+			assertThat(timedoutMessageId).isNotNull();
+			// the nacked message id should be the one with the longer processing delay
+			assertThat(testConsumer.getNackedMessages()).first().isEqualTo(timedoutMessageId.get());
 		}
-
 	}
 
 	@Test
-	void errorHandler() throws Exception {
+	void errorLogger() throws Exception {
 		int numMessages = 10;
 		TestConsumer testConsumer = new TestConsumer(numMessages);
 		CountDownLatch latch = new CountDownLatch(numMessages);
@@ -256,7 +260,7 @@ class ReactiveMessagePipelineTest {
 			pipeline.start();
 			assertThat(latch2.await(1, TimeUnit.SECONDS)).isTrue();
 		}
-		assertThat(inflightCounterConcurrency.getMax()).isGreaterThan(100);
+		assertThat(inflightCounterConcurrency.getMax()).isEqualTo(1000);
 	}
 
 	@Test
@@ -422,23 +426,48 @@ class ReactiveMessagePipelineTest {
 			this.numMessages = numMessages;
 		}
 
+		private final List<MessageId> acknowledgedMessages = new CopyOnWriteArrayList<>();
+
+		private final List<MessageId> nackedMessages = new CopyOnWriteArrayList<>();
+
 		@Override
 		public <R> Flux<R> consumeMany(Function<Flux<Message<String>>, Publisher<MessageResult<R>>> messageHandler) {
 			return Flux.defer(() -> {
 				Flux<Message<String>> messages = Flux.range(0, this.numMessages).map(Object::toString)
 						.map(TestMessage::new);
-				return Flux.from(messageHandler.apply(messages)).mapNotNull(MessageResult::getValue);
+				return Flux.from(messageHandler.apply(messages)).doOnNext((result) -> {
+					if (result.isAcknowledgeMessage()) {
+						this.acknowledgedMessages.add(result.getMessageId());
+					}
+					else {
+						this.nackedMessages.add(result.getMessageId());
+					}
+				}).mapNotNull(MessageResult::getValue);
 			});
 		}
 
+		List<MessageId> getAcknowledgedMessages() {
+			return this.acknowledgedMessages;
+		}
+
+		List<MessageId> getNackedMessages() {
+			return this.nackedMessages;
+		}
+
 	}
 
 	static class TestMessage implements Message<String> {
 
+		private static final AtomicLong MESSAGE_ID_GENERATOR = new AtomicLong(0L);
+
 		private final String value;
 
+		private final MessageId messageId;
+
 		TestMessage(String value) {
 			this.value = value;
+			this.messageId = DefaultImplementation.getDefaultImplementation().newMessageId(123456L,
+					MESSAGE_ID_GENERATOR.incrementAndGet(), -1);
 		}
 
 		@Override
@@ -473,7 +502,7 @@ class ReactiveMessagePipelineTest {
 
 		@Override
 		public MessageId getMessageId() {
-			return null;
+			return this.messageId;
 		}
 
 		@Override
@@ -590,10 +619,8 @@ class ReactiveMessagePipelineTest {
 		AtomicInteger current = new AtomicInteger();
 
 		private void begin() {
-			this.max.updateAndGet((currentMax) -> {
-				int incremented = this.current.incrementAndGet();
-				return (incremented > currentMax) ? incremented : currentMax;
-			});
+			int incremented = this.current.incrementAndGet();
+			this.max.updateAndGet((currentMax) -> Math.max(incremented, currentMax));
 		}
 
 		private void end() {