You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/10/24 11:31:58 UTC
[pulsar-client-reactive] branch main updated: Accept Reactive Streams Publisher as input (#12)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push:
new 855ce09 Accept Reactive Streams Publisher as input (#12)
855ce09 is described below
commit 855ce09d269f47739664ed3ece3452e066c305fa
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Mon Oct 24 13:31:53 2022 +0200
Accept Reactive Streams Publisher as input (#12)
---
README.adoc | 4 ++--
.../client/adapter/ReactiveMessageConsumerE2ETest.java | 2 +-
.../client/adapter/ReactiveMessagePipelineE2ETest.java | 4 ++--
.../client/adapter/ReactiveMessageReaderE2ETest.java | 2 +-
.../client/adapter/ReactiveMessageSenderE2ETest.java | 5 ++---
.../adapter/AdaptedReactiveMessageConsumer.java | 14 +++++++++-----
.../internal/adapter/AdaptedReactiveMessageSender.java | 11 ++++++-----
.../reactive/client/api/ReactiveMessageConsumer.java | 3 ++-
.../client/api/ReactiveMessagePipelineBuilder.java | 7 ++++---
.../reactive/client/api/ReactiveMessageSender.java | 17 +++++++++++++++--
.../api/DefaultReactiveMessagePipelineBuilder.java | 13 +++++++------
11 files changed, 51 insertions(+), 31 deletions(-)
diff --git a/README.adoc b/README.adoc
index 6516dfa..a4760c2 100644
--- a/README.adoc
+++ b/README.adoc
@@ -74,7 +74,7 @@ ReactiveMessageSender<String> messageSender = reactivePulsarClient
.maxInflight(100)
.build();
Mono<MessageId> messageId = messageSender
- .sendMessage(Mono.just(MessageSpec.of("Hello world!")));
+ .send(MessageSpec.of("Hello world!"));
// for demonstration
messageId.subscribe(System.out::println);
----
@@ -122,7 +122,7 @@ ReactiveMessageSender<String> messageSender = reactivePulsarClient
.maxInflight(100)
.build();
Mono<MessageId> messageId = messageSender
- .sendMessage(Mono.just(MessageSpec.of("Hello world!")));
+ .send(MessageSpec.of("Hello world!"));
// for demonstration
messageId.subscribe(System.out::println);
----
diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETest.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETest.java
index 2a5ada8..833ce88 100644
--- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETest.java
+++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETest.java
@@ -48,7 +48,7 @@ public class ReactiveMessageConsumerE2ETest {
ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
.cache(producerCache).topic(topicName).build();
- messageSender.sendMessages(Flux.range(1, 100).map(Object::toString).map(MessageSpec::of)).blockLast();
+ messageSender.send(Flux.range(1, 100).map(Object::toString).map(MessageSpec::of)).blockLast();
ReactiveMessageConsumer<String> messageConsumer = reactivePulsarClient.messageConsumer(Schema.STRING)
.topic(topicName).subscriptionName("sub").build();
diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
index a0927eb..5b69a51 100644
--- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
+++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
@@ -68,7 +68,7 @@ public class ReactiveMessagePipelineE2ETest {
ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
.topic(topicName).build();
- messageSender.sendMessages(Flux.range(1, 100).map(Object::toString).map(MessageSpec::of)).blockLast();
+ messageSender.send(Flux.range(1, 100).map(Object::toString).map(MessageSpec::of)).blockLast();
List<String> messages = Collections.synchronizedList(new ArrayList<>());
CountDownLatch latch = new CountDownLatch(100);
@@ -102,7 +102,7 @@ public class ReactiveMessagePipelineE2ETest {
List<MessageSpec<Integer>> messageSpecs = generateRandomOrderedMessagesWhereSingleKeyIsOrdered(
messageOrderScenario);
- messageSender.sendMessages(Flux.fromIterable(messageSpecs)).blockLast();
+ messageSender.send(Flux.fromIterable(messageSpecs)).blockLast();
ConcurrentMap<Integer, List<Integer>> messages = new ConcurrentHashMap<>();
CountDownLatch latch = new CountDownLatch(messageSpecs.size());
diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageReaderE2ETest.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageReaderE2ETest.java
index ed7a7f9..a82fd3b 100644
--- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageReaderE2ETest.java
+++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageReaderE2ETest.java
@@ -46,7 +46,7 @@ public class ReactiveMessageReaderE2ETest {
ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
.cache(producerCache).topic(topicName).build();
- messageSender.sendMessages(Flux.range(1, 100).map(Object::toString).map(MessageSpec::of)).blockLast();
+ messageSender.send(Flux.range(1, 100).map(Object::toString).map(MessageSpec::of)).blockLast();
ReactiveMessageReader<String> messageReader = reactivePulsarClient.messageReader(Schema.STRING)
.topic(topicName).build();
diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
index 927f715..078a0d6 100644
--- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
+++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
@@ -37,7 +37,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
-import reactor.core.publisher.Mono;
import static org.assertj.core.api.Assertions.assertThat;
@@ -64,7 +63,7 @@ public class ReactiveMessageSenderE2ETest {
ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
.topic(topicName).maxInflight(1).build();
- MessageId messageId = messageSender.sendMessage(Mono.just(MessageSpec.of("Hello world!"))).block();
+ MessageId messageId = messageSender.send(MessageSpec.of("Hello world!")).block();
assertThat(messageId).isNotNull();
Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
@@ -87,7 +86,7 @@ public class ReactiveMessageSenderE2ETest {
ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
.cache(producerCache).maxInflight(1).topic(topicName).build();
- MessageId messageId = messageSender.sendMessage(Mono.just(MessageSpec.of("Hello world!"))).block();
+ MessageId messageId = messageSender.send(MessageSpec.of("Hello world!")).block();
assertThat(messageId).isNotNull();
Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
index d3b5240..cdd44e7 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.MessageResult;
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec;
+import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;
@@ -205,11 +206,14 @@ class AdaptedReactiveMessageConsumer<T> implements ReactiveMessageConsumer<T> {
}
@Override
- public <R> Flux<R> consumeMessages(Function<Flux<Message<T>>, Flux<MessageResult<R>>> messageHandler) {
- return createReactiveConsumerAdapter().usingConsumerMany((consumer) -> Flux.using(this::pinAcknowledgeScheduler,
- (pinnedAcknowledgeScheduler) -> messageHandler.apply(readNextMessage(consumer).repeat()).delayUntil(
- (messageResult) -> handleAcknowledgement(consumer, messageResult, pinnedAcknowledgeScheduler))
- .handle(this::handleMessageResult),
+ public <R> Flux<R> consumeMessages(Function<Flux<Message<T>>, Publisher<MessageResult<R>>> messageHandler) {
+ return createReactiveConsumerAdapter().usingConsumerMany((consumer) -> Flux.using(
+ this::pinAcknowledgeScheduler, (
+ pinnedAcknowledgeScheduler) -> Flux
+ .from(messageHandler.apply(readNextMessage(consumer).repeat()))
+ .delayUntil((messageResult) -> handleAcknowledgement(consumer, messageResult,
+ pinnedAcknowledgeScheduler))
+ .handle(this::handleMessageResult),
Scheduler::dispose));
}
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
index d5843cc..038b885 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderSpec;
import org.apache.pulsar.reactive.client.internal.api.InternalMessageSpec;
import org.apache.pulsar.reactive.client.internal.api.PublisherTransformer;
+import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -178,9 +179,8 @@ class AdaptedReactiveMessageSender<T> implements ReactiveMessageSender<T> {
}
@Override
- public Mono<MessageId> sendMessage(Mono<MessageSpec<T>> messageSpec) {
- return createReactiveProducerAdapter()
- .usingProducer((producer) -> messageSpec.flatMap((m) -> createMessageMono(m, producer)));
+ public Mono<MessageId> send(MessageSpec<T> messageSpec) {
+ return createReactiveProducerAdapter().usingProducer((producer) -> createMessageMono(messageSpec, producer));
}
private Mono<MessageId> createMessageMono(MessageSpec<T> messageSpec, Producer<T> producer) {
@@ -192,11 +192,12 @@ class AdaptedReactiveMessageSender<T> implements ReactiveMessageSender<T> {
}
@Override
- public Flux<MessageId> sendMessages(Flux<MessageSpec<T>> messageSpecs) {
+ public Flux<MessageId> send(Publisher<MessageSpec<T>> messageSpecs) {
return createReactiveProducerAdapter().usingProducerMany((producer) ->
// TODO: ensure that inner publishers are subscribed in order so that message
// order is retained
- messageSpecs.flatMapSequential((messageSpec) -> createMessageMono(messageSpec, producer), this.maxConcurrency));
+ Flux.from(messageSpecs).flatMapSequential((messageSpec) -> createMessageMono(messageSpec, producer),
+ this.maxConcurrency));
}
}
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
index 728f089..312fa48 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
@@ -19,6 +19,7 @@ package org.apache.pulsar.reactive.client.api;
import java.util.function.Function;
import org.apache.pulsar.client.api.Message;
+import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -26,7 +27,7 @@ public interface ReactiveMessageConsumer<T> {
<R> Mono<R> consumeMessage(Function<Mono<Message<T>>, Mono<MessageResult<R>>> messageHandler);
- <R> Flux<R> consumeMessages(Function<Flux<Message<T>>, Flux<MessageResult<R>>> messageHandler);
+ <R> Flux<R> consumeMessages(Function<Flux<Message<T>>, Publisher<MessageResult<R>>> messageHandler);
/**
* Creates the Pulsar Consumer and immediately closes it. This is useful for creating
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java
index ab8e51d..86359a0 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java
@@ -21,18 +21,19 @@ import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.pulsar.client.api.Message;
+import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
public interface ReactiveMessagePipelineBuilder<T> {
- OneByOneMessagePipelineBuilder<T> messageHandler(Function<Message<T>, Mono<Void>> messageHandler);
+ OneByOneMessagePipelineBuilder<T> messageHandler(Function<Message<T>, Publisher<Void>> messageHandler);
ReactiveMessagePipelineBuilder<T> streamingMessageHandler(
- Function<Flux<Message<T>>, Flux<MessageResult<Void>>> streamingMessageHandler);
+ Function<Flux<Message<T>>, Publisher<MessageResult<Void>>> streamingMessageHandler);
- ReactiveMessagePipelineBuilder<T> transformPipeline(Function<Mono<Void>, Mono<Void>> transformer);
+ ReactiveMessagePipelineBuilder<T> transformPipeline(Function<Mono<Void>, Publisher<Void>> transformer);
ReactiveMessagePipelineBuilder<T> pipelineRetrySpec(Retry pipelineRetrySpec);
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
index 2edeb0f..50640dc 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
@@ -17,13 +17,26 @@
package org.apache.pulsar.reactive.client.api;
import org.apache.pulsar.client.api.MessageId;
+import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface ReactiveMessageSender<T> {
- Mono<MessageId> sendMessage(Mono<MessageSpec<T>> messageSpec);
+ /**
+ * Send one message.
+ * @param messageSpec the spec of the message to send
+ * @return a publisher that will emit one message id and complete
+ */
+ Mono<MessageId> send(MessageSpec<T> messageSpec);
- Flux<MessageId> sendMessages(Flux<MessageSpec<T>> messageSpecs);
+ /**
+ * Send multiple messages and get the associated message ids in the same order as the
+ * sent messages.
+ * @param messageSpecs the specs of the messages to send
+ * @return a publisher that will emit a message id per message successfully sent in
+ * the order that they have been sent
+ */
+ Flux<MessageId> send(Publisher<MessageSpec<T>> messageSpecs);
}
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.java
index e92f683..59358ec 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.reactive.client.api.MessageResult;
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
import org.apache.pulsar.reactive.client.api.ReactiveMessagePipeline;
import org.apache.pulsar.reactive.client.api.ReactiveMessagePipelineBuilder;
+import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
@@ -55,7 +56,7 @@ class DefaultReactiveMessagePipelineBuilder<T>
private final ReactiveMessageConsumer<T> messageConsumer;
- private Function<Message<T>, Mono<Void>> messageHandler;
+ private Function<Message<T>, Publisher<Void>> messageHandler;
private BiConsumer<Message<T>, Throwable> errorLogger;
@@ -67,9 +68,9 @@ class DefaultReactiveMessagePipelineBuilder<T>
private Duration handlingTimeout = Duration.ofSeconds(120);
- private Function<Mono<Void>, Mono<Void>> transformer = Function.identity();
+ private Function<Mono<Void>, Publisher<Void>> transformer = (it) -> it;
- private Function<Flux<Message<T>>, Flux<MessageResult<Void>>> streamingMessageHandler;
+ private Function<Flux<Message<T>>, Publisher<MessageResult<Void>>> streamingMessageHandler;
private int concurrency;
@@ -82,14 +83,14 @@ class DefaultReactiveMessagePipelineBuilder<T>
}
@Override
- public OneByOneMessagePipelineBuilder<T> messageHandler(Function<Message<T>, Mono<Void>> messageHandler) {
+ public OneByOneMessagePipelineBuilder<T> messageHandler(Function<Message<T>, Publisher<Void>> messageHandler) {
this.messageHandler = messageHandler;
return this;
}
@Override
public ReactiveMessagePipelineBuilder<T> streamingMessageHandler(
- Function<Flux<Message<T>>, Flux<MessageResult<Void>>> streamingMessageHandler) {
+ Function<Flux<Message<T>>, Publisher<MessageResult<Void>>> streamingMessageHandler) {
this.streamingMessageHandler = streamingMessageHandler;
return this;
}
@@ -145,7 +146,7 @@ class DefaultReactiveMessagePipelineBuilder<T>
}
@Override
- public ReactiveMessagePipelineBuilder<T> transformPipeline(Function<Mono<Void>, Mono<Void>> transformer) {
+ public ReactiveMessagePipelineBuilder<T> transformPipeline(Function<Mono<Void>, Publisher<Void>> transformer) {
this.transformer = transformer;
return this;
}