You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/10/27 16:40:29 UTC

[pulsar-client-reactive] branch main updated: Rename methods to use naming convention of [action]One and [action]Many (#14)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new 3831ea6  Rename methods to use naming convention of [action]One and [action]Many (#14)
3831ea6 is described below

commit 3831ea64281050c42d7dd9da9b630bf7968b395d
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Oct 27 19:40:24 2022 +0300

    Rename methods to use naming convention of [action]One and [action]Many (#14)
    
    * Rename consume* and read* methods
    
    * Rename send methods to be sendOne and sendMany
---
 .../reactive/client/adapter/ReactiveMessageConsumerE2ETest.java     | 6 +++---
 .../reactive/client/adapter/ReactiveMessagePipelineE2ETest.java     | 4 ++--
 .../reactive/client/adapter/ReactiveMessageReaderE2ETest.java       | 4 ++--
 .../reactive/client/adapter/ReactiveMessageSenderE2ETest.java       | 4 ++--
 .../client/internal/adapter/AdaptedReactiveMessageConsumer.java     | 6 +++---
 .../client/internal/adapter/AdaptedReactiveMessageReader.java       | 4 ++--
 .../client/internal/adapter/AdaptedReactiveMessageSender.java       | 4 ++--
 .../apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java  | 4 ++--
 .../apache/pulsar/reactive/client/api/ReactiveMessageReader.java    | 4 ++--
 .../apache/pulsar/reactive/client/api/ReactiveMessageSender.java    | 4 ++--
 .../client/internal/api/DefaultReactiveMessagePipeline.java         | 2 +-
 11 files changed, 23 insertions(+), 23 deletions(-)

diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETest.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETest.java
index 833ce88..ed617de 100644
--- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETest.java
+++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETest.java
@@ -48,12 +48,12 @@ public class ReactiveMessageConsumerE2ETest {
 
 			ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
 					.cache(producerCache).topic(topicName).build();
-			messageSender.send(Flux.range(1, 100).map(Object::toString).map(MessageSpec::of)).blockLast();
+			messageSender.sendMany(Flux.range(1, 100).map(Object::toString).map(MessageSpec::of)).blockLast();
 
 			ReactiveMessageConsumer<String> messageConsumer = reactivePulsarClient.messageConsumer(Schema.STRING)
 					.topic(topicName).subscriptionName("sub").build();
 			List<String> messages = messageConsumer
-					.consumeMessages((messageFlux) -> messageFlux
+					.consumeMany((messageFlux) -> messageFlux
 							.map((message) -> MessageResult.acknowledge(message.getMessageId(), message.getValue())))
 					.take(Duration.ofSeconds(2)).collectList().block();
 
@@ -61,7 +61,7 @@ public class ReactiveMessageConsumerE2ETest {
 
 			// should have acknowledged all messages
 			List<Message<String>> remainingMessages = messageConsumer
-					.consumeMessages((messageFlux) -> messageFlux.map(MessageResult::acknowledgeAndReturn))
+					.consumeMany((messageFlux) -> messageFlux.map(MessageResult::acknowledgeAndReturn))
 					.take(Duration.ofSeconds(2)).collectList().block();
 			assertThat(remainingMessages).isEmpty();
 		}
diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
index 5b69a51..04c8c8d 100644
--- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
+++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
@@ -68,7 +68,7 @@ public class ReactiveMessagePipelineE2ETest {
 
 			ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
 					.topic(topicName).build();
-			messageSender.send(Flux.range(1, 100).map(Object::toString).map(MessageSpec::of)).blockLast();
+			messageSender.sendMany(Flux.range(1, 100).map(Object::toString).map(MessageSpec::of)).blockLast();
 
 			List<String> messages = Collections.synchronizedList(new ArrayList<>());
 			CountDownLatch latch = new CountDownLatch(100);
@@ -102,7 +102,7 @@ public class ReactiveMessagePipelineE2ETest {
 			List<MessageSpec<Integer>> messageSpecs = generateRandomOrderedMessagesWhereSingleKeyIsOrdered(
 					messageOrderScenario);
 
-			messageSender.send(Flux.fromIterable(messageSpecs)).blockLast();
+			messageSender.sendMany(Flux.fromIterable(messageSpecs)).blockLast();
 
 			ConcurrentMap<Integer, List<Integer>> messages = new ConcurrentHashMap<>();
 			CountDownLatch latch = new CountDownLatch(messageSpecs.size());
diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageReaderE2ETest.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageReaderE2ETest.java
index a82fd3b..f6dafe7 100644
--- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageReaderE2ETest.java
+++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageReaderE2ETest.java
@@ -46,11 +46,11 @@ public class ReactiveMessageReaderE2ETest {
 
 			ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
 					.cache(producerCache).topic(topicName).build();
-			messageSender.send(Flux.range(1, 100).map(Object::toString).map(MessageSpec::of)).blockLast();
+			messageSender.sendMany(Flux.range(1, 100).map(Object::toString).map(MessageSpec::of)).blockLast();
 
 			ReactiveMessageReader<String> messageReader = reactivePulsarClient.messageReader(Schema.STRING)
 					.topic(topicName).build();
-			List<String> messages = messageReader.readMessages().map(Message::getValue).collectList().block();
+			List<String> messages = messageReader.readMany().map(Message::getValue).collectList().block();
 
 			assertThat(messages).isEqualTo(Flux.range(1, 100).map(Object::toString).collectList().block());
 		}
diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
index 078a0d6..c55379a 100644
--- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
+++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
@@ -63,7 +63,7 @@ public class ReactiveMessageSenderE2ETest {
 
 			ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
 					.topic(topicName).maxInflight(1).build();
-			MessageId messageId = messageSender.send(MessageSpec.of("Hello world!")).block();
+			MessageId messageId = messageSender.sendOne(MessageSpec.of("Hello world!")).block();
 			assertThat(messageId).isNotNull();
 
 			Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
@@ -86,7 +86,7 @@ public class ReactiveMessageSenderE2ETest {
 
 			ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
 					.cache(producerCache).maxInflight(1).topic(topicName).build();
-			MessageId messageId = messageSender.send(MessageSpec.of("Hello world!")).block();
+			MessageId messageId = messageSender.sendOne(MessageSpec.of("Hello world!")).block();
 			assertThat(messageId).isNotNull();
 
 			Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
index cdd44e7..5240781 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
@@ -57,9 +57,9 @@ class AdaptedReactiveMessageConsumer<T> implements ReactiveMessageConsumer<T> {
 	}
 
 	@Override
-	public <R> Mono<R> consumeMessage(Function<Mono<Message<T>>, Mono<MessageResult<R>>> messageHandler) {
+	public <R> Mono<R> consumeOne(Function<Mono<Message<T>>, Publisher<MessageResult<R>>> messageHandler) {
 		return createReactiveConsumerAdapter().usingConsumer((consumer) -> Mono.using(this::pinAcknowledgeScheduler,
-				(pinnedAcknowledgeScheduler) -> messageHandler.apply(readNextMessage(consumer)).delayUntil(
+				(pinnedAcknowledgeScheduler) -> Mono.from(messageHandler.apply(readNextMessage(consumer))).delayUntil(
 						(messageResult) -> handleAcknowledgement(consumer, messageResult, pinnedAcknowledgeScheduler))
 						.handle(this::handleMessageResult),
 				Scheduler::dispose));
@@ -206,7 +206,7 @@ class AdaptedReactiveMessageConsumer<T> implements ReactiveMessageConsumer<T> {
 	}
 
 	@Override
-	public <R> Flux<R> consumeMessages(Function<Flux<Message<T>>, Publisher<MessageResult<R>>> messageHandler) {
+	public <R> Flux<R> consumeMany(Function<Flux<Message<T>>, Publisher<MessageResult<R>>> messageHandler) {
 		return createReactiveConsumerAdapter().usingConsumerMany((consumer) -> Flux.using(
 				this::pinAcknowledgeScheduler, (
 						pinnedAcknowledgeScheduler) -> Flux
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReader.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReader.java
index 76abb2e..e06a84e 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReader.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReader.java
@@ -127,13 +127,13 @@ class AdaptedReactiveMessageReader<T> implements ReactiveMessageReader<T> {
 	}
 
 	@Override
-	public Mono<Message<T>> readMessage() {
+	public Mono<Message<T>> readOne() {
 		return createReactiveReaderAdapter(this.startAtSpec)
 				.usingReader((reader) -> readNextMessage(reader, this.endOfStreamAction));
 	}
 
 	@Override
-	public Flux<Message<T>> readMessages() {
+	public Flux<Message<T>> readMany() {
 		return createReactiveReaderAdapter(this.startAtSpec).usingReaderMany((reader) -> {
 			Mono<Message<T>> messageMono = readNextMessage(reader, this.endOfStreamAction);
 			if (this.endOfStreamAction == EndOfStreamAction.COMPLETE) {
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
index 038b885..0471b7c 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
@@ -179,7 +179,7 @@ class AdaptedReactiveMessageSender<T> implements ReactiveMessageSender<T> {
 	}
 
 	@Override
-	public Mono<MessageId> send(MessageSpec<T> messageSpec) {
+	public Mono<MessageId> sendOne(MessageSpec<T> messageSpec) {
 		return createReactiveProducerAdapter().usingProducer((producer) -> createMessageMono(messageSpec, producer));
 	}
 
@@ -192,7 +192,7 @@ class AdaptedReactiveMessageSender<T> implements ReactiveMessageSender<T> {
 	}
 
 	@Override
-	public Flux<MessageId> send(Publisher<MessageSpec<T>> messageSpecs) {
+	public Flux<MessageId> sendMany(Publisher<MessageSpec<T>> messageSpecs) {
 		return createReactiveProducerAdapter().usingProducerMany((producer) ->
 		// TODO: ensure that inner publishers are subscribed in order so that message
 		// order is retained
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
index 312fa48..51a966d 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
@@ -25,9 +25,9 @@ import reactor.core.publisher.Mono;
 
 public interface ReactiveMessageConsumer<T> {
 
-	<R> Mono<R> consumeMessage(Function<Mono<Message<T>>, Mono<MessageResult<R>>> messageHandler);
+	<R> Mono<R> consumeOne(Function<Mono<Message<T>>, Publisher<MessageResult<R>>> messageHandler);
 
-	<R> Flux<R> consumeMessages(Function<Flux<Message<T>>, Publisher<MessageResult<R>>> messageHandler);
+	<R> Flux<R> consumeMany(Function<Flux<Message<T>>, Publisher<MessageResult<R>>> messageHandler);
 
 	/**
 	 * Creates the Pulsar Consumer and immediately closes it. This is useful for creating
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageReader.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageReader.java
index 9392b26..6b76334 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageReader.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageReader.java
@@ -22,8 +22,8 @@ import reactor.core.publisher.Mono;
 
 public interface ReactiveMessageReader<T> {
 
-	Mono<Message<T>> readMessage();
+	Mono<Message<T>> readOne();
 
-	Flux<Message<T>> readMessages();
+	Flux<Message<T>> readMany();
 
 }
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
index 50640dc..2d5fb83 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
@@ -28,7 +28,7 @@ public interface ReactiveMessageSender<T> {
 	 * @param messageSpec the spec of the message to send
 	 * @return a publisher that will emit one message id and complete
 	 */
-	Mono<MessageId> send(MessageSpec<T> messageSpec);
+	Mono<MessageId> sendOne(MessageSpec<T> messageSpec);
 
 	/**
 	 * Send multiple messages and get the associated message ids in the same order as the
@@ -37,6 +37,6 @@ public interface ReactiveMessageSender<T> {
 	 * @return a publisher that will emit a message id per message successfully sent in
 	 * the order that they have been sent
 	 */
-	Flux<MessageId> send(Publisher<MessageSpec<T>> messageSpecs);
+	Flux<MessageId> sendMany(Publisher<MessageSpec<T>> messageSpecs);
 
 }
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
index bf7d265..c094628 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
@@ -76,7 +76,7 @@ class DefaultReactiveMessagePipeline<T> implements ReactiveMessagePipeline {
 		this.groupingFunction = groupingFunction;
 		this.concurrency = concurrency;
 		this.maxInflight = maxInflight;
-		this.pipeline = messageConsumer.consumeMessages(this::createMessageConsumer).then().transform(transformer)
+		this.pipeline = messageConsumer.consumeMany(this::createMessageConsumer).then().transform(transformer)
 				.transform(this::decoratePipeline);
 	}