You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/10/20 15:52:15 UTC
[GitHub] [pulsar-client-reactive] cbornet opened a new pull request, #12: Change ReactiveMessageSender API to use r.s.Publisher
cbornet opened a new pull request, #12:
URL: https://github.com/apache/pulsar-client-reactive/pull/12
Fixes #1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-reactive] lhotari commented on a diff in pull request #12: Accept Reactive Streams Publisher as input
Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #12:
URL: https://github.com/apache/pulsar-client-reactive/pull/12#discussion_r1001611489
##########
README.adoc:
##########
@@ -73,8 +73,8 @@ ReactiveMessageSender<String> messageSender = reactivePulsarClient
.topic(topicName)
.maxInflight(100)
.build();
-Mono<MessageId> messageId = messageSender
- .sendMessage(Mono.just(MessageSpec.of("Hello world!")));
+Publisher<MessageId> messageId = messageSender
Review Comment:
I guess we could keep `Mono` here?
##########
README.adoc:
##########
@@ -73,8 +73,8 @@ ReactiveMessageSender<String> messageSender = reactivePulsarClient
.topic(topicName)
.maxInflight(100)
.build();
-Mono<MessageId> messageId = messageSender
- .sendMessage(Mono.just(MessageSpec.of("Hello world!")));
+Publisher<MessageId> messageId = messageSender
Review Comment:
```suggestion
Mono<MessageId> messageId = messageSender
```
##########
pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java:
##########
@@ -192,11 +192,12 @@ private Mono<MessageId> createMessageMono(MessageSpec<T> messageSpec, Producer<T
}
@Override
- public Flux<MessageId> sendMessages(Flux<MessageSpec<T>> messageSpecs) {
+ public Flux<MessageId> sendAll(Publisher<MessageSpec<T>> messageSpecs) {
Review Comment:
```suggestion
public Flux<MessageId> send(Publisher<MessageSpec<T>> messageSpecs) {
```
I'd prefer `send` over `sendAll`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-reactive] lhotari merged pull request #12: Accept Reactive Streams Publisher as input
Posted by GitBox <gi...@apache.org>.
lhotari merged PR #12:
URL: https://github.com/apache/pulsar-client-reactive/pull/12
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-reactive] cbornet commented on a diff in pull request #12: Accept Reactive Streams Publisher as input
Posted by GitBox <gi...@apache.org>.
cbornet commented on code in PR #12:
URL: https://github.com/apache/pulsar-client-reactive/pull/12#discussion_r1001710463
##########
pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java:
##########
@@ -192,11 +192,12 @@ private Mono<MessageId> createMessageMono(MessageSpec<T> messageSpec, Producer<T
}
@Override
- public Flux<MessageId> sendMessages(Flux<MessageSpec<T>> messageSpecs) {
+ public Flux<MessageId> sendAll(Publisher<MessageSpec<T>> messageSpecs) {
Review Comment:
I also prefer `send`. I had put `sendAll` because R2DBC has `saveAll`. But reactor-kafka also uses `send`.
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-reactive] lhotari commented on a diff in pull request #12: Change ReactiveMessageSender API to use r.s.Publisher
Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #12:
URL: https://github.com/apache/pulsar-client-reactive/pull/12#discussion_r1001524358
##########
pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java:
##########
@@ -17,13 +17,24 @@
package org.apache.pulsar.reactive.client.api;
import org.apache.pulsar.client.api.MessageId;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
+import org.reactivestreams.Publisher;
public interface ReactiveMessageSender<T> {
- Mono<MessageId> sendMessage(Mono<MessageSpec<T>> messageSpec);
+ /**
+ * Send one message.
+ * @param messageSpec the spec of the message to send
+ * @return a publisher that will emit one message id and complete
+ */
+ Publisher<MessageId> send(MessageSpec<T> messageSpec);
Review Comment:
```suggestion
Mono<MessageId> send(MessageSpec<T> messageSpec);
```
##########
pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java:
##########
@@ -17,13 +17,24 @@
package org.apache.pulsar.reactive.client.api;
import org.apache.pulsar.client.api.MessageId;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
+import org.reactivestreams.Publisher;
public interface ReactiveMessageSender<T> {
- Mono<MessageId> sendMessage(Mono<MessageSpec<T>> messageSpec);
+ /**
+ * Send one message.
+ * @param messageSpec the spec of the message to send
+ * @return a publisher that will emit one message id and complete
+ */
+ Publisher<MessageId> send(MessageSpec<T> messageSpec);
- Flux<MessageId> sendMessages(Flux<MessageSpec<T>> messageSpecs);
+ /**
+ * Send multiple messages and get the associated message ids in the same order as the
+ * sent messages.
+ * @param messageSpecs the specs of the messages to send
+ * @return a publisher that will emit a message id per message successfully sent in
+ * the order that they have been sent
+ */
+ Publisher<MessageId> send(Publisher<MessageSpec<T>> messageSpecs);
Review Comment:
```suggestion
Flux<MessageId> send(Publisher<MessageSpec<T>> messageSpecs);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-reactive] cbornet commented on a diff in pull request #12: Accept Reactive Streams Publisher as input
Posted by GitBox <gi...@apache.org>.
cbornet commented on code in PR #12:
URL: https://github.com/apache/pulsar-client-reactive/pull/12#discussion_r1001709345
##########
README.adoc:
##########
@@ -73,8 +73,8 @@ ReactiveMessageSender<String> messageSender = reactivePulsarClient
.topic(topicName)
.maxInflight(100)
.build();
-Mono<MessageId> messageId = messageSender
- .sendMessage(Mono.just(MessageSpec.of("Hello world!")));
+Publisher<MessageId> messageId = messageSender
Review Comment:
Yes, I forgot to change.
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-reactive] cbornet commented on pull request #12: Accept Reactive Streams Publisher as input
Posted by GitBox <gi...@apache.org>.
cbornet commented on PR #12:
URL: https://github.com/apache/pulsar-client-reactive/pull/12#issuecomment-1286702030
OK. I updated the API.
Also made ReactiveMessageConsumer::consumeMessages accept Publisher as user function output.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org