You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/10/20 15:52:15 UTC

[GitHub] [pulsar-client-reactive] cbornet opened a new pull request, #12: Change ReactiveMessageSender API to use r.s.Publisher

cbornet opened a new pull request, #12:
URL: https://github.com/apache/pulsar-client-reactive/pull/12

   Fixes #1 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-reactive] lhotari commented on a diff in pull request #12: Accept Reactive Streams Publisher as input

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #12:
URL: https://github.com/apache/pulsar-client-reactive/pull/12#discussion_r1001611489


##########
README.adoc:
##########
@@ -73,8 +73,8 @@ ReactiveMessageSender<String> messageSender = reactivePulsarClient
         .topic(topicName)
         .maxInflight(100)
         .build();
-Mono<MessageId> messageId = messageSender
-        .sendMessage(Mono.just(MessageSpec.of("Hello world!")));
+Publisher<MessageId> messageId = messageSender

Review Comment:
   I guess we could keep `Mono` here?



##########
README.adoc:
##########
@@ -73,8 +73,8 @@ ReactiveMessageSender<String> messageSender = reactivePulsarClient
         .topic(topicName)
         .maxInflight(100)
         .build();
-Mono<MessageId> messageId = messageSender
-        .sendMessage(Mono.just(MessageSpec.of("Hello world!")));
+Publisher<MessageId> messageId = messageSender

Review Comment:
   ```suggestion
   Mono<MessageId> messageId = messageSender
   ```



##########
pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java:
##########
@@ -192,11 +192,12 @@ private Mono<MessageId> createMessageMono(MessageSpec<T> messageSpec, Producer<T
 	}
 
 	@Override
-	public Flux<MessageId> sendMessages(Flux<MessageSpec<T>> messageSpecs) {
+	public Flux<MessageId> sendAll(Publisher<MessageSpec<T>> messageSpecs) {

Review Comment:
   ```suggestion
   	public Flux<MessageId> send(Publisher<MessageSpec<T>> messageSpecs) {
   ```
   I'd prefer `send` over `sendAll`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-reactive] lhotari merged pull request #12: Accept Reactive Streams Publisher as input

Posted by GitBox <gi...@apache.org>.
lhotari merged PR #12:
URL: https://github.com/apache/pulsar-client-reactive/pull/12


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-reactive] cbornet commented on a diff in pull request #12: Accept Reactive Streams Publisher as input

Posted by GitBox <gi...@apache.org>.
cbornet commented on code in PR #12:
URL: https://github.com/apache/pulsar-client-reactive/pull/12#discussion_r1001710463


##########
pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java:
##########
@@ -192,11 +192,12 @@ private Mono<MessageId> createMessageMono(MessageSpec<T> messageSpec, Producer<T
 	}
 
 	@Override
-	public Flux<MessageId> sendMessages(Flux<MessageSpec<T>> messageSpecs) {
+	public Flux<MessageId> sendAll(Publisher<MessageSpec<T>> messageSpecs) {

Review Comment:
   I also prefer `send`. I had put `sendAll` because R2DBC has `saveAll`. But reactor-kafka also uses `send`.
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-reactive] lhotari commented on a diff in pull request #12: Change ReactiveMessageSender API to use r.s.Publisher

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #12:
URL: https://github.com/apache/pulsar-client-reactive/pull/12#discussion_r1001524358


##########
pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java:
##########
@@ -17,13 +17,24 @@
 package org.apache.pulsar.reactive.client.api;
 
 import org.apache.pulsar.client.api.MessageId;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
+import org.reactivestreams.Publisher;
 
 public interface ReactiveMessageSender<T> {
 
-	Mono<MessageId> sendMessage(Mono<MessageSpec<T>> messageSpec);
+	/**
+	 * Send one message.
+	 * @param messageSpec the spec of the message to send
+	 * @return a publisher that will emit one message id and complete
+	 */
+	Publisher<MessageId> send(MessageSpec<T> messageSpec);

Review Comment:
   ```suggestion
   	Mono<MessageId> send(MessageSpec<T> messageSpec);
   ```



##########
pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java:
##########
@@ -17,13 +17,24 @@
 package org.apache.pulsar.reactive.client.api;
 
 import org.apache.pulsar.client.api.MessageId;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
+import org.reactivestreams.Publisher;
 
 public interface ReactiveMessageSender<T> {
 
-	Mono<MessageId> sendMessage(Mono<MessageSpec<T>> messageSpec);
+	/**
+	 * Send one message.
+	 * @param messageSpec the spec of the message to send
+	 * @return a publisher that will emit one message id and complete
+	 */
+	Publisher<MessageId> send(MessageSpec<T> messageSpec);
 
-	Flux<MessageId> sendMessages(Flux<MessageSpec<T>> messageSpecs);
+	/**
+	 * Send multiple messages and get the associated message ids in the same order as the
+	 * sent messages.
+	 * @param messageSpecs the specs of the messages to send
+	 * @return a publisher that will emit a message id per message successfully sent in
+	 * the order that they have been sent
+	 */
+	Publisher<MessageId> send(Publisher<MessageSpec<T>> messageSpecs);

Review Comment:
   ```suggestion
   	Flux<MessageId> send(Publisher<MessageSpec<T>> messageSpecs);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-reactive] cbornet commented on a diff in pull request #12: Accept Reactive Streams Publisher as input

Posted by GitBox <gi...@apache.org>.
cbornet commented on code in PR #12:
URL: https://github.com/apache/pulsar-client-reactive/pull/12#discussion_r1001709345


##########
README.adoc:
##########
@@ -73,8 +73,8 @@ ReactiveMessageSender<String> messageSender = reactivePulsarClient
         .topic(topicName)
         .maxInflight(100)
         .build();
-Mono<MessageId> messageId = messageSender
-        .sendMessage(Mono.just(MessageSpec.of("Hello world!")));
+Publisher<MessageId> messageId = messageSender

Review Comment:
   Yes, I forgot to change.
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-reactive] cbornet commented on pull request #12: Accept Reactive Streams Publisher as input

Posted by GitBox <gi...@apache.org>.
cbornet commented on PR #12:
URL: https://github.com/apache/pulsar-client-reactive/pull/12#issuecomment-1286702030

   OK. I updated the API.
   Also made ReactiveMessageConsumer::consumeMessages accept Publisher as user function output.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org