You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by cb...@apache.org on 2022/12/07 15:21:53 UTC
[pulsar-client-reactive] branch main updated: Require cache when maxInflight is set for ReactiveMessageSenderBuilder (#91)
This is an automated email from the ASF dual-hosted git repository.
cbornet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push:
new 3340083 Require cache when maxInflight is set for ReactiveMessageSenderBuilder (#91)
3340083 is described below
commit 334008356ddb98a611058394f8903d4f08c2d6e6
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Dec 7 17:21:48 2022 +0200
Require cache when maxInflight is set for ReactiveMessageSenderBuilder (#91)
---
.../pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java | 2 +-
.../client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java | 1 +
2 files changed, 2 insertions(+), 1 deletion(-)
diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
index bc40351..f3acda1 100644
--- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
+++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
@@ -62,7 +62,7 @@ class ReactiveMessageSenderE2ETest {
ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient);
ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
- .topic(topicName).maxInflight(1).build();
+ .topic(topicName).build();
MessageId messageId = messageSender.sendOne(MessageSpec.of("Hello world!")).block();
assertThat(messageId).isNotNull();
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java
index 18fb53a..20f75f8 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java
@@ -99,6 +99,7 @@ class AdaptedReactiveMessageSenderBuilder<T> implements ReactiveMessageSenderBui
public ReactiveMessageSender<T> build() {
Object producerActionTransformerKey;
if (this.maxInflight > 0) {
+ Objects.requireNonNull(this.producerCache, "cache must be provided when maxInflight is set.");
this.producerActionTransformer = () -> new InflightLimiter(this.maxInflight,
Math.max(this.maxInflight / 2, 1), Schedulers.single(), this.maxConcurrentSenderSubscriptions);
producerActionTransformerKey = new ProducerActionTransformerKey(this.maxInflight,