You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/12/02 05:33:22 UTC

[pulsar-client-reactive] branch main updated: Fix incorrect cast in DefaultReactiveMessagePipeline (#74)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new 6bc4cd6  Fix incorrect cast in DefaultReactiveMessagePipeline (#74)
6bc4cd6 is described below

commit 6bc4cd6363959034506f42e47239ebfef1e0fb85
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Fri Dec 2 06:33:18 2022 +0100

    Fix incorrect cast in DefaultReactiveMessagePipeline (#74)
---
 .../internal/api/DefaultReactiveMessagePipeline.java    | 17 +++++++++--------
 .../api/DefaultReactiveMessagePipelineBuilder.java      |  2 +-
 2 files changed, 10 insertions(+), 9 deletions(-)

diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
index 90b7d2b..7c54c12 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.reactive.client.api.MessageGroupingFunction;
 import org.apache.pulsar.reactive.client.api.MessageResult;
 import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
 import org.apache.pulsar.reactive.client.api.ReactiveMessagePipeline;
+import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import reactor.core.Disposable;
@@ -47,7 +48,7 @@ class DefaultReactiveMessagePipeline<T> implements ReactiveMessagePipeline {
 
 	private final Mono<Void> pipeline;
 
-	private final Function<Message<T>, Mono<Void>> messageHandler;
+	private final Function<Message<T>, Publisher<Void>> messageHandler;
 
 	private final BiConsumer<Message<T>, Throwable> errorLogger;
 
@@ -55,7 +56,7 @@ class DefaultReactiveMessagePipeline<T> implements ReactiveMessagePipeline {
 
 	private final Duration handlingTimeout;
 
-	private final Function<Flux<Message<T>>, Flux<MessageResult<Void>>> streamingMessageHandler;
+	private final Function<Flux<Message<T>>, Publisher<MessageResult<Void>>> streamingMessageHandler;
 
 	private final int concurrency;
 
@@ -64,9 +65,9 @@ class DefaultReactiveMessagePipeline<T> implements ReactiveMessagePipeline {
 	private final MessageGroupingFunction groupingFunction;
 
 	DefaultReactiveMessagePipeline(ReactiveMessageConsumer<T> messageConsumer,
-			Function<Message<T>, Mono<Void>> messageHandler, BiConsumer<Message<T>, Throwable> errorLogger,
-			Retry pipelineRetrySpec, Duration handlingTimeout, Function<Mono<Void>, Mono<Void>> transformer,
-			Function<Flux<Message<T>>, Flux<MessageResult<Void>>> streamingMessageHandler,
+			Function<Message<T>, Publisher<Void>> messageHandler, BiConsumer<Message<T>, Throwable> errorLogger,
+			Retry pipelineRetrySpec, Duration handlingTimeout, Function<Mono<Void>, Publisher<Void>> transformer,
+			Function<Flux<Message<T>>, Publisher<MessageResult<Void>>> streamingMessageHandler,
 			MessageGroupingFunction groupingFunction, int concurrency, int maxInflight) {
 		this.messageHandler = messageHandler;
 		this.errorLogger = errorLogger;
@@ -130,13 +131,13 @@ class DefaultReactiveMessagePipeline<T> implements ReactiveMessagePipeline {
 			}
 		}
 		else {
-			return Objects.requireNonNull(this.streamingMessageHandler,
-					"streamingMessageHandler or messageHandler must be set").apply(messageFlux);
+			return Flux.from(Objects.requireNonNull(this.streamingMessageHandler,
+					"streamingMessageHandler or messageHandler must be set").apply(messageFlux));
 		}
 	}
 
 	private Mono<MessageResult<Void>> handleMessage(Message<T> message) {
-		return this.messageHandler.apply(message).transform(this::decorateMessageHandler)
+		return Mono.from(this.messageHandler.apply(message)).transform(this::decorateMessageHandler)
 				.thenReturn(MessageResult.acknowledge(message.getMessageId())).onErrorResume((throwable) -> {
 					if (this.errorLogger != null) {
 						try {
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.java
index 3d68357..dac1ef4 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.java
@@ -155,7 +155,7 @@ class DefaultReactiveMessagePipelineBuilder<T>
 		if (this.messageHandler == null && this.streamingMessageHandler == null) {
 			throw new NullPointerException("messageHandler or streamingMessageHandler must be set.");
 		}
-		return new DefaultReactiveMessagePipeline(this.messageConsumer, this.messageHandler, this.errorLogger,
+		return new DefaultReactiveMessagePipeline<>(this.messageConsumer, this.messageHandler, this.errorLogger,
 				this.pipelineRetrySpec, this.handlingTimeout, this.transformer, this.streamingMessageHandler,
 				this.groupingFunction, this.concurrency, this.maxInflight);
 	}