You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by cb...@apache.org on 2022/12/07 15:21:53 UTC

[pulsar-client-reactive] branch main updated: Require cache when maxInflight is set for ReactiveMessageSenderBuilder (#91)

This is an automated email from the ASF dual-hosted git repository.

cbornet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new 3340083  Require cache when maxInflight is set for ReactiveMessageSenderBuilder (#91)
3340083 is described below

commit 334008356ddb98a611058394f8903d4f08c2d6e6
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Dec 7 17:21:48 2022 +0200

    Require cache when maxInflight is set for ReactiveMessageSenderBuilder (#91)
---
 .../pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java    | 2 +-
 .../client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java    | 1 +
 2 files changed, 2 insertions(+), 1 deletion(-)

diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
index bc40351..f3acda1 100644
--- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
+++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
@@ -62,7 +62,7 @@ class ReactiveMessageSenderE2ETest {
 				ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient);
 
 				ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
-						.topic(topicName).maxInflight(1).build();
+						.topic(topicName).build();
 				MessageId messageId = messageSender.sendOne(MessageSpec.of("Hello world!")).block();
 				assertThat(messageId).isNotNull();
 
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java
index 18fb53a..20f75f8 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java
@@ -99,6 +99,7 @@ class AdaptedReactiveMessageSenderBuilder<T> implements ReactiveMessageSenderBui
 	public ReactiveMessageSender<T> build() {
 		Object producerActionTransformerKey;
 		if (this.maxInflight > 0) {
+			Objects.requireNonNull(this.producerCache, "cache must be provided when maxInflight is set.");
 			this.producerActionTransformer = () -> new InflightLimiter(this.maxInflight,
 					Math.max(this.maxInflight / 2, 1), Schedulers.single(), this.maxConcurrentSenderSubscriptions);
 			producerActionTransformerKey = new ProducerActionTransformerKey(this.maxInflight,