You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/12/01 12:19:41 UTC
[pulsar-client-reactive] branch main updated: Fix InflightCounter used in ReactiveMessagePipelineTest (#69)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push:
new a518e87 Fix InflightCounter used in ReactiveMessagePipelineTest (#69)
a518e87 is described below
commit a518e878a0a8aaeb81885a1c49a017d4e5d70e7c
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Dec 1 14:19:35 2022 +0200
Fix InflightCounter used in ReactiveMessagePipelineTest (#69)
* Fix InflightCounter used in ReactiveMessagePipelineTest
- max.updateAndGet might get called multiple times when there are races
* Fix flaky ReactiveMessagePipelineTest.handlingTimeout
* Rename test method errorHandler -> errorLogger
---
.../client/api/ReactiveMessagePipelineTest.java | 91 ++++++++++++++--------
1 file changed, 59 insertions(+), 32 deletions(-)
diff --git a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTest.java b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTest.java
index 5b55b17..ea8d548 100644
--- a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTest.java
+++ b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTest.java
@@ -17,18 +17,23 @@
package org.apache.pulsar.reactive.client.api;
import java.time.Duration;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.api.EncryptionContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -181,36 +186,35 @@ class ReactiveMessagePipelineTest {
void handlingTimeout() throws Exception {
int numMessages = 10;
TestConsumer testConsumer = new TestConsumer(numMessages);
- CountDownLatch latch1 = new CountDownLatch(numMessages);
- CountDownLatch latch2 = new CountDownLatch(numMessages);
- Function<Message<String>, Publisher<Void>> messageHandler = (message) -> {
- latch1.countDown();
- return Mono.delay(Duration.ofMillis(2)).doOnNext((it) -> latch2.countDown()).then();
- };
- try (ReactiveMessagePipeline pipeline = testConsumer.messagePipeline().messageHandler(messageHandler).build()) {
- pipeline.start();
- assertThat(latch1.await(10000, TimeUnit.MILLISECONDS)).isTrue();
- assertThat(latch2.await(10, TimeUnit.MILLISECONDS)).isTrue();
- }
-
- CountDownLatch latch3 = new CountDownLatch(numMessages);
- CountDownLatch latch4 = new CountDownLatch(numMessages);
- Function<Message<String>, Publisher<Void>> messageHandler2 = (message) -> {
- latch3.countDown();
- return Mono.delay(Duration.ofMillis(2)).doOnNext((it) -> latch4.countDown()).then();
- };
- try (ReactiveMessagePipeline pipeline = testConsumer.messagePipeline().messageHandler(messageHandler2)
- .handlingTimeout(Duration.ofMillis(1)).build()) {
+ CountDownLatch latch = new CountDownLatch(numMessages);
+ AtomicReference<MessageId> timedoutMessageId = new AtomicReference<>();
+ Function<Message<String>, Publisher<Void>> messageHandler = (message) -> Mono.defer(() -> {
+ Duration delay;
+ if (message.getValue().equals("5")) {
+ delay = Duration.ofMillis(15);
+ timedoutMessageId.set(message.getMessageId());
+ }
+ else {
+ delay = Duration.ofMillis(2);
+ }
+ return Mono.delay(delay).doFinally((__) -> latch.countDown()).then();
+ });
+ try (ReactiveMessagePipeline pipeline = testConsumer.messagePipeline().messageHandler(messageHandler)
+ .handlingTimeout(Duration.ofMillis(5)).build()) {
pipeline.start();
- assertThat(latch3.await(100, TimeUnit.MILLISECONDS)).isTrue();
- assertThat(latch4.await(10, TimeUnit.MILLISECONDS)).isFalse();
- assertThat(latch4.getCount()).isEqualTo(10);
+ assertThat(latch.await(1, TimeUnit.SECONDS)).isTrue();
+ // 9 messages should have been acked
+ assertThat(testConsumer.getAcknowledgedMessages()).hasSize(9);
+ // 1 message should have been nacked
+ assertThat(testConsumer.getNackedMessages()).hasSize(1);
+ assertThat(timedoutMessageId).isNotNull();
+ // the nacked message id should be the one with the longer processing delay
+ assertThat(testConsumer.getNackedMessages()).first().isEqualTo(timedoutMessageId.get());
}
-
}
@Test
- void errorHandler() throws Exception {
+ void errorLogger() throws Exception {
int numMessages = 10;
TestConsumer testConsumer = new TestConsumer(numMessages);
CountDownLatch latch = new CountDownLatch(numMessages);
@@ -256,7 +260,7 @@ class ReactiveMessagePipelineTest {
pipeline.start();
assertThat(latch2.await(1, TimeUnit.SECONDS)).isTrue();
}
- assertThat(inflightCounterConcurrency.getMax()).isGreaterThan(100);
+ assertThat(inflightCounterConcurrency.getMax()).isEqualTo(1000);
}
@Test
@@ -422,23 +426,48 @@ class ReactiveMessagePipelineTest {
this.numMessages = numMessages;
}
+ private final List<MessageId> acknowledgedMessages = new CopyOnWriteArrayList<>();
+
+ private final List<MessageId> nackedMessages = new CopyOnWriteArrayList<>();
+
@Override
public <R> Flux<R> consumeMany(Function<Flux<Message<String>>, Publisher<MessageResult<R>>> messageHandler) {
return Flux.defer(() -> {
Flux<Message<String>> messages = Flux.range(0, this.numMessages).map(Object::toString)
.map(TestMessage::new);
- return Flux.from(messageHandler.apply(messages)).mapNotNull(MessageResult::getValue);
+ return Flux.from(messageHandler.apply(messages)).doOnNext((result) -> {
+ if (result.isAcknowledgeMessage()) {
+ this.acknowledgedMessages.add(result.getMessageId());
+ }
+ else {
+ this.nackedMessages.add(result.getMessageId());
+ }
+ }).mapNotNull(MessageResult::getValue);
});
}
+ List<MessageId> getAcknowledgedMessages() {
+ return this.acknowledgedMessages;
+ }
+
+ List<MessageId> getNackedMessages() {
+ return this.nackedMessages;
+ }
+
}
static class TestMessage implements Message<String> {
+ private static final AtomicLong MESSAGE_ID_GENERATOR = new AtomicLong(0L);
+
private final String value;
+ private final MessageId messageId;
+
TestMessage(String value) {
this.value = value;
+ this.messageId = DefaultImplementation.getDefaultImplementation().newMessageId(123456L,
+ MESSAGE_ID_GENERATOR.incrementAndGet(), -1);
}
@Override
@@ -473,7 +502,7 @@ class ReactiveMessagePipelineTest {
@Override
public MessageId getMessageId() {
- return null;
+ return this.messageId;
}
@Override
@@ -590,10 +619,8 @@ class ReactiveMessagePipelineTest {
AtomicInteger current = new AtomicInteger();
private void begin() {
- this.max.updateAndGet((currentMax) -> {
- int incremented = this.current.incrementAndGet();
- return (incremented > currentMax) ? incremented : currentMax;
- });
+ int incremented = this.current.incrementAndGet();
+ this.max.updateAndGet((currentMax) -> Math.max(incremented, currentMax));
}
private void end() {