You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/10/24 11:31:58 UTC

[pulsar-client-reactive] branch main updated: Accept Reactive Streams Publisher as input (#12)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new 855ce09  Accept Reactive Streams Publisher as input (#12)
855ce09 is described below

commit 855ce09d269f47739664ed3ece3452e066c305fa
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Mon Oct 24 13:31:53 2022 +0200

    Accept Reactive Streams Publisher as input (#12)
---
 README.adoc                                             |  4 ++--
 .../client/adapter/ReactiveMessageConsumerE2ETest.java  |  2 +-
 .../client/adapter/ReactiveMessagePipelineE2ETest.java  |  4 ++--
 .../client/adapter/ReactiveMessageReaderE2ETest.java    |  2 +-
 .../client/adapter/ReactiveMessageSenderE2ETest.java    |  5 ++---
 .../adapter/AdaptedReactiveMessageConsumer.java         | 14 +++++++++-----
 .../internal/adapter/AdaptedReactiveMessageSender.java  | 11 ++++++-----
 .../reactive/client/api/ReactiveMessageConsumer.java    |  3 ++-
 .../client/api/ReactiveMessagePipelineBuilder.java      |  7 ++++---
 .../reactive/client/api/ReactiveMessageSender.java      | 17 +++++++++++++++--
 .../api/DefaultReactiveMessagePipelineBuilder.java      | 13 +++++++------
 11 files changed, 51 insertions(+), 31 deletions(-)

diff --git a/README.adoc b/README.adoc
index 6516dfa..a4760c2 100644
--- a/README.adoc
+++ b/README.adoc
@@ -74,7 +74,7 @@ ReactiveMessageSender<String> messageSender = reactivePulsarClient
         .maxInflight(100)
         .build();
 Mono<MessageId> messageId = messageSender
-        .sendMessage(Mono.just(MessageSpec.of("Hello world!")));
+        .send(MessageSpec.of("Hello world!"));
 // for demonstration
 messageId.subscribe(System.out::println);
 ----
@@ -122,7 +122,7 @@ ReactiveMessageSender<String> messageSender = reactivePulsarClient
         .maxInflight(100)
         .build();
 Mono<MessageId> messageId = messageSender
-        .sendMessage(Mono.just(MessageSpec.of("Hello world!")));
+        .send(MessageSpec.of("Hello world!"));
 // for demonstration
 messageId.subscribe(System.out::println);
 ----
diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETest.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETest.java
index 2a5ada8..833ce88 100644
--- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETest.java
+++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETest.java
@@ -48,7 +48,7 @@ public class ReactiveMessageConsumerE2ETest {
 
 			ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
 					.cache(producerCache).topic(topicName).build();
-			messageSender.sendMessages(Flux.range(1, 100).map(Object::toString).map(MessageSpec::of)).blockLast();
+			messageSender.send(Flux.range(1, 100).map(Object::toString).map(MessageSpec::of)).blockLast();
 
 			ReactiveMessageConsumer<String> messageConsumer = reactivePulsarClient.messageConsumer(Schema.STRING)
 					.topic(topicName).subscriptionName("sub").build();
diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
index a0927eb..5b69a51 100644
--- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
+++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
@@ -68,7 +68,7 @@ public class ReactiveMessagePipelineE2ETest {
 
 			ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
 					.topic(topicName).build();
-			messageSender.sendMessages(Flux.range(1, 100).map(Object::toString).map(MessageSpec::of)).blockLast();
+			messageSender.send(Flux.range(1, 100).map(Object::toString).map(MessageSpec::of)).blockLast();
 
 			List<String> messages = Collections.synchronizedList(new ArrayList<>());
 			CountDownLatch latch = new CountDownLatch(100);
@@ -102,7 +102,7 @@ public class ReactiveMessagePipelineE2ETest {
 			List<MessageSpec<Integer>> messageSpecs = generateRandomOrderedMessagesWhereSingleKeyIsOrdered(
 					messageOrderScenario);
 
-			messageSender.sendMessages(Flux.fromIterable(messageSpecs)).blockLast();
+			messageSender.send(Flux.fromIterable(messageSpecs)).blockLast();
 
 			ConcurrentMap<Integer, List<Integer>> messages = new ConcurrentHashMap<>();
 			CountDownLatch latch = new CountDownLatch(messageSpecs.size());
diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageReaderE2ETest.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageReaderE2ETest.java
index ed7a7f9..a82fd3b 100644
--- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageReaderE2ETest.java
+++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageReaderE2ETest.java
@@ -46,7 +46,7 @@ public class ReactiveMessageReaderE2ETest {
 
 			ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
 					.cache(producerCache).topic(topicName).build();
-			messageSender.sendMessages(Flux.range(1, 100).map(Object::toString).map(MessageSpec::of)).blockLast();
+			messageSender.send(Flux.range(1, 100).map(Object::toString).map(MessageSpec::of)).blockLast();
 
 			ReactiveMessageReader<String> messageReader = reactivePulsarClient.messageReader(Schema.STRING)
 					.topic(topicName).build();
diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
index 927f715..078a0d6 100644
--- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
+++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
@@ -37,7 +37,6 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
-import reactor.core.publisher.Mono;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -64,7 +63,7 @@ public class ReactiveMessageSenderE2ETest {
 
 			ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
 					.topic(topicName).maxInflight(1).build();
-			MessageId messageId = messageSender.sendMessage(Mono.just(MessageSpec.of("Hello world!"))).block();
+			MessageId messageId = messageSender.send(MessageSpec.of("Hello world!")).block();
 			assertThat(messageId).isNotNull();
 
 			Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
@@ -87,7 +86,7 @@ public class ReactiveMessageSenderE2ETest {
 
 			ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
 					.cache(producerCache).maxInflight(1).topic(topicName).build();
-			MessageId messageId = messageSender.sendMessage(Mono.just(MessageSpec.of("Hello world!"))).block();
+			MessageId messageId = messageSender.send(MessageSpec.of("Hello world!")).block();
 			assertThat(messageId).isNotNull();
 
 			Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
index d3b5240..cdd44e7 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.reactive.client.api.MessageResult;
 import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
 import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec;
+import org.reactivestreams.Publisher;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.SynchronousSink;
@@ -205,11 +206,14 @@ class AdaptedReactiveMessageConsumer<T> implements ReactiveMessageConsumer<T> {
 	}
 
 	@Override
-	public <R> Flux<R> consumeMessages(Function<Flux<Message<T>>, Flux<MessageResult<R>>> messageHandler) {
-		return createReactiveConsumerAdapter().usingConsumerMany((consumer) -> Flux.using(this::pinAcknowledgeScheduler,
-				(pinnedAcknowledgeScheduler) -> messageHandler.apply(readNextMessage(consumer).repeat()).delayUntil(
-						(messageResult) -> handleAcknowledgement(consumer, messageResult, pinnedAcknowledgeScheduler))
-						.handle(this::handleMessageResult),
+	public <R> Flux<R> consumeMessages(Function<Flux<Message<T>>, Publisher<MessageResult<R>>> messageHandler) {
+		return createReactiveConsumerAdapter().usingConsumerMany((consumer) -> Flux.using(
+				this::pinAcknowledgeScheduler, (
+						pinnedAcknowledgeScheduler) -> Flux
+								.from(messageHandler.apply(readNextMessage(consumer).repeat()))
+								.delayUntil((messageResult) -> handleAcknowledgement(consumer, messageResult,
+										pinnedAcknowledgeScheduler))
+								.handle(this::handleMessageResult),
 				Scheduler::dispose));
 	}
 
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
index d5843cc..038b885 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
 import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderSpec;
 import org.apache.pulsar.reactive.client.internal.api.InternalMessageSpec;
 import org.apache.pulsar.reactive.client.internal.api.PublisherTransformer;
+import org.reactivestreams.Publisher;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -178,9 +179,8 @@ class AdaptedReactiveMessageSender<T> implements ReactiveMessageSender<T> {
 	}
 
 	@Override
-	public Mono<MessageId> sendMessage(Mono<MessageSpec<T>> messageSpec) {
-		return createReactiveProducerAdapter()
-				.usingProducer((producer) -> messageSpec.flatMap((m) -> createMessageMono(m, producer)));
+	public Mono<MessageId> send(MessageSpec<T> messageSpec) {
+		return createReactiveProducerAdapter().usingProducer((producer) -> createMessageMono(messageSpec, producer));
 	}
 
 	private Mono<MessageId> createMessageMono(MessageSpec<T> messageSpec, Producer<T> producer) {
@@ -192,11 +192,12 @@ class AdaptedReactiveMessageSender<T> implements ReactiveMessageSender<T> {
 	}
 
 	@Override
-	public Flux<MessageId> sendMessages(Flux<MessageSpec<T>> messageSpecs) {
+	public Flux<MessageId> send(Publisher<MessageSpec<T>> messageSpecs) {
 		return createReactiveProducerAdapter().usingProducerMany((producer) ->
 		// TODO: ensure that inner publishers are subscribed in order so that message
 		// order is retained
-		messageSpecs.flatMapSequential((messageSpec) -> createMessageMono(messageSpec, producer), this.maxConcurrency));
+		Flux.from(messageSpecs).flatMapSequential((messageSpec) -> createMessageMono(messageSpec, producer),
+				this.maxConcurrency));
 	}
 
 }
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
index 728f089..312fa48 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
@@ -19,6 +19,7 @@ package org.apache.pulsar.reactive.client.api;
 import java.util.function.Function;
 
 import org.apache.pulsar.client.api.Message;
+import org.reactivestreams.Publisher;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -26,7 +27,7 @@ public interface ReactiveMessageConsumer<T> {
 
 	<R> Mono<R> consumeMessage(Function<Mono<Message<T>>, Mono<MessageResult<R>>> messageHandler);
 
-	<R> Flux<R> consumeMessages(Function<Flux<Message<T>>, Flux<MessageResult<R>>> messageHandler);
+	<R> Flux<R> consumeMessages(Function<Flux<Message<T>>, Publisher<MessageResult<R>>> messageHandler);
 
 	/**
 	 * Creates the Pulsar Consumer and immediately closes it. This is useful for creating
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java
index ab8e51d..86359a0 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java
@@ -21,18 +21,19 @@ import java.util.function.BiConsumer;
 import java.util.function.Function;
 
 import org.apache.pulsar.client.api.Message;
+import org.reactivestreams.Publisher;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.util.retry.Retry;
 
 public interface ReactiveMessagePipelineBuilder<T> {
 
-	OneByOneMessagePipelineBuilder<T> messageHandler(Function<Message<T>, Mono<Void>> messageHandler);
+	OneByOneMessagePipelineBuilder<T> messageHandler(Function<Message<T>, Publisher<Void>> messageHandler);
 
 	ReactiveMessagePipelineBuilder<T> streamingMessageHandler(
-			Function<Flux<Message<T>>, Flux<MessageResult<Void>>> streamingMessageHandler);
+			Function<Flux<Message<T>>, Publisher<MessageResult<Void>>> streamingMessageHandler);
 
-	ReactiveMessagePipelineBuilder<T> transformPipeline(Function<Mono<Void>, Mono<Void>> transformer);
+	ReactiveMessagePipelineBuilder<T> transformPipeline(Function<Mono<Void>, Publisher<Void>> transformer);
 
 	ReactiveMessagePipelineBuilder<T> pipelineRetrySpec(Retry pipelineRetrySpec);
 
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
index 2edeb0f..50640dc 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
@@ -17,13 +17,26 @@
 package org.apache.pulsar.reactive.client.api;
 
 import org.apache.pulsar.client.api.MessageId;
+import org.reactivestreams.Publisher;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public interface ReactiveMessageSender<T> {
 
-	Mono<MessageId> sendMessage(Mono<MessageSpec<T>> messageSpec);
+	/**
+	 * Send one message.
+	 * @param messageSpec the spec of the message to send
+	 * @return a publisher that will emit one message id and complete
+	 */
+	Mono<MessageId> send(MessageSpec<T> messageSpec);
 
-	Flux<MessageId> sendMessages(Flux<MessageSpec<T>> messageSpecs);
+	/**
+	 * Send multiple messages and get the associated message ids in the same order as the
+	 * sent messages.
+	 * @param messageSpecs the specs of the messages to send
+	 * @return a publisher that will emit a message id per message successfully sent in
+	 * the order that they have been sent
+	 */
+	Flux<MessageId> send(Publisher<MessageSpec<T>> messageSpecs);
 
 }
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.java
index e92f683..59358ec 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.reactive.client.api.MessageResult;
 import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
 import org.apache.pulsar.reactive.client.api.ReactiveMessagePipeline;
 import org.apache.pulsar.reactive.client.api.ReactiveMessagePipelineBuilder;
+import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import reactor.core.publisher.Flux;
@@ -55,7 +56,7 @@ class DefaultReactiveMessagePipelineBuilder<T>
 
 	private final ReactiveMessageConsumer<T> messageConsumer;
 
-	private Function<Message<T>, Mono<Void>> messageHandler;
+	private Function<Message<T>, Publisher<Void>> messageHandler;
 
 	private BiConsumer<Message<T>, Throwable> errorLogger;
 
@@ -67,9 +68,9 @@ class DefaultReactiveMessagePipelineBuilder<T>
 
 	private Duration handlingTimeout = Duration.ofSeconds(120);
 
-	private Function<Mono<Void>, Mono<Void>> transformer = Function.identity();
+	private Function<Mono<Void>, Publisher<Void>> transformer = (it) -> it;
 
-	private Function<Flux<Message<T>>, Flux<MessageResult<Void>>> streamingMessageHandler;
+	private Function<Flux<Message<T>>, Publisher<MessageResult<Void>>> streamingMessageHandler;
 
 	private int concurrency;
 
@@ -82,14 +83,14 @@ class DefaultReactiveMessagePipelineBuilder<T>
 	}
 
 	@Override
-	public OneByOneMessagePipelineBuilder<T> messageHandler(Function<Message<T>, Mono<Void>> messageHandler) {
+	public OneByOneMessagePipelineBuilder<T> messageHandler(Function<Message<T>, Publisher<Void>> messageHandler) {
 		this.messageHandler = messageHandler;
 		return this;
 	}
 
 	@Override
 	public ReactiveMessagePipelineBuilder<T> streamingMessageHandler(
-			Function<Flux<Message<T>>, Flux<MessageResult<Void>>> streamingMessageHandler) {
+			Function<Flux<Message<T>>, Publisher<MessageResult<Void>>> streamingMessageHandler) {
 		this.streamingMessageHandler = streamingMessageHandler;
 		return this;
 	}
@@ -145,7 +146,7 @@ class DefaultReactiveMessagePipelineBuilder<T>
 	}
 
 	@Override
-	public ReactiveMessagePipelineBuilder<T> transformPipeline(Function<Mono<Void>, Mono<Void>> transformer) {
+	public ReactiveMessagePipelineBuilder<T> transformPipeline(Function<Mono<Void>, Publisher<Void>> transformer) {
 		this.transformer = transformer;
 		return this;
 	}