You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/10/27 16:40:29 UTC
[pulsar-client-reactive] branch main updated: Rename methods to use naming convention of [action]One and [action]Many (#14)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push:
new 3831ea6 Rename methods to use naming convention of [action]One and [action]Many (#14)
3831ea6 is described below
commit 3831ea64281050c42d7dd9da9b630bf7968b395d
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Oct 27 19:40:24 2022 +0300
Rename methods to use naming convention of [action]One and [action]Many (#14)
* Rename consume* and read* methods
* Rename send methods to be sendOne and sendMany
---
.../reactive/client/adapter/ReactiveMessageConsumerE2ETest.java | 6 +++---
.../reactive/client/adapter/ReactiveMessagePipelineE2ETest.java | 4 ++--
.../reactive/client/adapter/ReactiveMessageReaderE2ETest.java | 4 ++--
.../reactive/client/adapter/ReactiveMessageSenderE2ETest.java | 4 ++--
.../client/internal/adapter/AdaptedReactiveMessageConsumer.java | 6 +++---
.../client/internal/adapter/AdaptedReactiveMessageReader.java | 4 ++--
.../client/internal/adapter/AdaptedReactiveMessageSender.java | 4 ++--
.../apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java | 4 ++--
.../apache/pulsar/reactive/client/api/ReactiveMessageReader.java | 4 ++--
.../apache/pulsar/reactive/client/api/ReactiveMessageSender.java | 4 ++--
.../client/internal/api/DefaultReactiveMessagePipeline.java | 2 +-
11 files changed, 23 insertions(+), 23 deletions(-)
diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETest.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETest.java
index 833ce88..ed617de 100644
--- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETest.java
+++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETest.java
@@ -48,12 +48,12 @@ public class ReactiveMessageConsumerE2ETest {
ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
.cache(producerCache).topic(topicName).build();
- messageSender.send(Flux.range(1, 100).map(Object::toString).map(MessageSpec::of)).blockLast();
+ messageSender.sendMany(Flux.range(1, 100).map(Object::toString).map(MessageSpec::of)).blockLast();
ReactiveMessageConsumer<String> messageConsumer = reactivePulsarClient.messageConsumer(Schema.STRING)
.topic(topicName).subscriptionName("sub").build();
List<String> messages = messageConsumer
- .consumeMessages((messageFlux) -> messageFlux
+ .consumeMany((messageFlux) -> messageFlux
.map((message) -> MessageResult.acknowledge(message.getMessageId(), message.getValue())))
.take(Duration.ofSeconds(2)).collectList().block();
@@ -61,7 +61,7 @@ public class ReactiveMessageConsumerE2ETest {
// should have acknowledged all messages
List<Message<String>> remainingMessages = messageConsumer
- .consumeMessages((messageFlux) -> messageFlux.map(MessageResult::acknowledgeAndReturn))
+ .consumeMany((messageFlux) -> messageFlux.map(MessageResult::acknowledgeAndReturn))
.take(Duration.ofSeconds(2)).collectList().block();
assertThat(remainingMessages).isEmpty();
}
diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
index 5b69a51..04c8c8d 100644
--- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
+++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
@@ -68,7 +68,7 @@ public class ReactiveMessagePipelineE2ETest {
ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
.topic(topicName).build();
- messageSender.send(Flux.range(1, 100).map(Object::toString).map(MessageSpec::of)).blockLast();
+ messageSender.sendMany(Flux.range(1, 100).map(Object::toString).map(MessageSpec::of)).blockLast();
List<String> messages = Collections.synchronizedList(new ArrayList<>());
CountDownLatch latch = new CountDownLatch(100);
@@ -102,7 +102,7 @@ public class ReactiveMessagePipelineE2ETest {
List<MessageSpec<Integer>> messageSpecs = generateRandomOrderedMessagesWhereSingleKeyIsOrdered(
messageOrderScenario);
- messageSender.send(Flux.fromIterable(messageSpecs)).blockLast();
+ messageSender.sendMany(Flux.fromIterable(messageSpecs)).blockLast();
ConcurrentMap<Integer, List<Integer>> messages = new ConcurrentHashMap<>();
CountDownLatch latch = new CountDownLatch(messageSpecs.size());
diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageReaderE2ETest.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageReaderE2ETest.java
index a82fd3b..f6dafe7 100644
--- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageReaderE2ETest.java
+++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageReaderE2ETest.java
@@ -46,11 +46,11 @@ public class ReactiveMessageReaderE2ETest {
ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
.cache(producerCache).topic(topicName).build();
- messageSender.send(Flux.range(1, 100).map(Object::toString).map(MessageSpec::of)).blockLast();
+ messageSender.sendMany(Flux.range(1, 100).map(Object::toString).map(MessageSpec::of)).blockLast();
ReactiveMessageReader<String> messageReader = reactivePulsarClient.messageReader(Schema.STRING)
.topic(topicName).build();
- List<String> messages = messageReader.readMessages().map(Message::getValue).collectList().block();
+ List<String> messages = messageReader.readMany().map(Message::getValue).collectList().block();
assertThat(messages).isEqualTo(Flux.range(1, 100).map(Object::toString).collectList().block());
}
diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
index 078a0d6..c55379a 100644
--- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
+++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
@@ -63,7 +63,7 @@ public class ReactiveMessageSenderE2ETest {
ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
.topic(topicName).maxInflight(1).build();
- MessageId messageId = messageSender.send(MessageSpec.of("Hello world!")).block();
+ MessageId messageId = messageSender.sendOne(MessageSpec.of("Hello world!")).block();
assertThat(messageId).isNotNull();
Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
@@ -86,7 +86,7 @@ public class ReactiveMessageSenderE2ETest {
ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
.cache(producerCache).maxInflight(1).topic(topicName).build();
- MessageId messageId = messageSender.send(MessageSpec.of("Hello world!")).block();
+ MessageId messageId = messageSender.sendOne(MessageSpec.of("Hello world!")).block();
assertThat(messageId).isNotNull();
Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
index cdd44e7..5240781 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
@@ -57,9 +57,9 @@ class AdaptedReactiveMessageConsumer<T> implements ReactiveMessageConsumer<T> {
}
@Override
- public <R> Mono<R> consumeMessage(Function<Mono<Message<T>>, Mono<MessageResult<R>>> messageHandler) {
+ public <R> Mono<R> consumeOne(Function<Mono<Message<T>>, Publisher<MessageResult<R>>> messageHandler) {
return createReactiveConsumerAdapter().usingConsumer((consumer) -> Mono.using(this::pinAcknowledgeScheduler,
- (pinnedAcknowledgeScheduler) -> messageHandler.apply(readNextMessage(consumer)).delayUntil(
+ (pinnedAcknowledgeScheduler) -> Mono.from(messageHandler.apply(readNextMessage(consumer))).delayUntil(
(messageResult) -> handleAcknowledgement(consumer, messageResult, pinnedAcknowledgeScheduler))
.handle(this::handleMessageResult),
Scheduler::dispose));
@@ -206,7 +206,7 @@ class AdaptedReactiveMessageConsumer<T> implements ReactiveMessageConsumer<T> {
}
@Override
- public <R> Flux<R> consumeMessages(Function<Flux<Message<T>>, Publisher<MessageResult<R>>> messageHandler) {
+ public <R> Flux<R> consumeMany(Function<Flux<Message<T>>, Publisher<MessageResult<R>>> messageHandler) {
return createReactiveConsumerAdapter().usingConsumerMany((consumer) -> Flux.using(
this::pinAcknowledgeScheduler, (
pinnedAcknowledgeScheduler) -> Flux
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReader.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReader.java
index 76abb2e..e06a84e 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReader.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReader.java
@@ -127,13 +127,13 @@ class AdaptedReactiveMessageReader<T> implements ReactiveMessageReader<T> {
}
@Override
- public Mono<Message<T>> readMessage() {
+ public Mono<Message<T>> readOne() {
return createReactiveReaderAdapter(this.startAtSpec)
.usingReader((reader) -> readNextMessage(reader, this.endOfStreamAction));
}
@Override
- public Flux<Message<T>> readMessages() {
+ public Flux<Message<T>> readMany() {
return createReactiveReaderAdapter(this.startAtSpec).usingReaderMany((reader) -> {
Mono<Message<T>> messageMono = readNextMessage(reader, this.endOfStreamAction);
if (this.endOfStreamAction == EndOfStreamAction.COMPLETE) {
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
index 038b885..0471b7c 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
@@ -179,7 +179,7 @@ class AdaptedReactiveMessageSender<T> implements ReactiveMessageSender<T> {
}
@Override
- public Mono<MessageId> send(MessageSpec<T> messageSpec) {
+ public Mono<MessageId> sendOne(MessageSpec<T> messageSpec) {
return createReactiveProducerAdapter().usingProducer((producer) -> createMessageMono(messageSpec, producer));
}
@@ -192,7 +192,7 @@ class AdaptedReactiveMessageSender<T> implements ReactiveMessageSender<T> {
}
@Override
- public Flux<MessageId> send(Publisher<MessageSpec<T>> messageSpecs) {
+ public Flux<MessageId> sendMany(Publisher<MessageSpec<T>> messageSpecs) {
return createReactiveProducerAdapter().usingProducerMany((producer) ->
// TODO: ensure that inner publishers are subscribed in order so that message
// order is retained
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
index 312fa48..51a966d 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
@@ -25,9 +25,9 @@ import reactor.core.publisher.Mono;
public interface ReactiveMessageConsumer<T> {
- <R> Mono<R> consumeMessage(Function<Mono<Message<T>>, Mono<MessageResult<R>>> messageHandler);
+ <R> Mono<R> consumeOne(Function<Mono<Message<T>>, Publisher<MessageResult<R>>> messageHandler);
- <R> Flux<R> consumeMessages(Function<Flux<Message<T>>, Publisher<MessageResult<R>>> messageHandler);
+ <R> Flux<R> consumeMany(Function<Flux<Message<T>>, Publisher<MessageResult<R>>> messageHandler);
/**
* Creates the Pulsar Consumer and immediately closes it. This is useful for creating
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageReader.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageReader.java
index 9392b26..6b76334 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageReader.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageReader.java
@@ -22,8 +22,8 @@ import reactor.core.publisher.Mono;
public interface ReactiveMessageReader<T> {
- Mono<Message<T>> readMessage();
+ Mono<Message<T>> readOne();
- Flux<Message<T>> readMessages();
+ Flux<Message<T>> readMany();
}
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
index 50640dc..2d5fb83 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
@@ -28,7 +28,7 @@ public interface ReactiveMessageSender<T> {
* @param messageSpec the spec of the message to send
* @return a publisher that will emit one message id and complete
*/
- Mono<MessageId> send(MessageSpec<T> messageSpec);
+ Mono<MessageId> sendOne(MessageSpec<T> messageSpec);
/**
* Send multiple messages and get the associated message ids in the same order as the
@@ -37,6 +37,6 @@ public interface ReactiveMessageSender<T> {
* @return a publisher that will emit a message id per message successfully sent in
* the order that they have been sent
*/
- Flux<MessageId> send(Publisher<MessageSpec<T>> messageSpecs);
+ Flux<MessageId> sendMany(Publisher<MessageSpec<T>> messageSpecs);
}
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
index bf7d265..c094628 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
@@ -76,7 +76,7 @@ class DefaultReactiveMessagePipeline<T> implements ReactiveMessagePipeline {
this.groupingFunction = groupingFunction;
this.concurrency = concurrency;
this.maxInflight = maxInflight;
- this.pipeline = messageConsumer.consumeMessages(this::createMessageConsumer).then().transform(transformer)
+ this.pipeline = messageConsumer.consumeMany(this::createMessageConsumer).then().transform(transformer)
.transform(this::decoratePipeline);
}