You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/12/02 15:38:45 UTC

[pulsar-client-reactive] branch main updated: Some cleanups (#79)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new e2e06c0  Some cleanups (#79)
e2e06c0 is described below

commit e2e06c049653a975132e65a33b771c10621d5d0f
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Fri Dec 2 16:38:41 2022 +0100

    Some cleanups (#79)
---
 .../adapter/ReactiveMessagePipelineE2ETest.java    |  6 ++--
 .../adapter/ReactiveMessageSenderE2ETest.java      | 42 +++++++++++-----------
 .../AdaptedReactiveMessageConsumerBuilder.java     |  2 +-
 .../ConcurrentHashMapProducerCacheProvider.java    |  4 +--
 .../client/internal/adapter/ProducerCache.java     | 10 ++----
 .../internal/adapter/ProducerCacheEntry.java       | 10 ++----
 .../client/internal/adapter/ProducerCacheKey.java  |  4 ---
 .../internal/adapter/PulsarFutureAdapter.java      |  2 +-
 .../adapter/ReactiveConsumerAdapterFactory.java    |  2 +-
 .../internal/adapter/ReactiveProducerAdapter.java  |  4 +--
 .../adapter/ReactivePulsarResourceAdapter.java     |  8 -----
 .../adapter/ReactiveReaderAdapterFactory.java      |  2 +-
 .../reactive/client/api/MessageSpecBuilder.java    |  2 +-
 .../client/api/ReactiveMessageConsumerBuilder.java | 26 +++++++-------
 .../reactive/client/api/ReactivePulsarClient.java  |  4 +--
 .../internal/api/ApiImplementationFactory.java     |  2 +-
 .../internal/api/DefaultMessageSpecBuilder.java    |  2 +-
 .../api/GroupOrderedMessageProcessors.java         |  2 +-
 .../reactive/client/api/MessageSpecTest.java       |  3 +-
 19 files changed, 59 insertions(+), 78 deletions(-)

diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
index 25dd1f1..3914961 100644
--- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
+++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
@@ -73,13 +73,13 @@ class ReactiveMessagePipelineE2ETest {
 			List<String> messages = Collections.synchronizedList(new ArrayList<>());
 			CountDownLatch latch = new CountDownLatch(100);
 
-			try (ReactiveMessagePipeline reactiveMessagePipeline = reactivePulsarClient.messageConsumer(Schema.STRING)
+			try (ReactiveMessagePipeline ignored = reactivePulsarClient.messageConsumer(Schema.STRING)
 					.subscriptionName("sub").topic(topicName).build().messagePipeline()
 					.messageHandler((message) -> Mono.fromRunnable(() -> {
 						messages.add(message.getValue());
 						latch.countDown();
 					})).build().start()) {
-				latch.await(5, TimeUnit.SECONDS);
+				assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
 				assertThat(messages).isEqualTo(Flux.range(1, 100).map(Object::toString).collectList().block());
 			}
 		}
@@ -133,7 +133,7 @@ class ReactiveMessagePipelineE2ETest {
 			if (messageOrderScenario != MessageOrderScenario.NO_PARALLEL) {
 				reactiveMessageHandlerBuilder.concurrency(KEYS_COUNT).useKeyOrderedProcessing();
 			}
-			try (ReactiveMessagePipeline reactiveMessagePipeline = reactiveMessageHandlerBuilder.build().start()) {
+			try (ReactiveMessagePipeline ignored = reactiveMessageHandlerBuilder.build().start()) {
 				boolean latchCompleted = latch.await(5, TimeUnit.SECONDS);
 				assertThat(latchCompleted).as("processing of all messages should have completed").isTrue();
 				for (int i = 1; i <= KEYS_COUNT; i++) {
diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
index 9a43150..bc40351 100644
--- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
+++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
@@ -56,19 +56,20 @@ class ReactiveMessageSenderE2ETest {
 	void shouldSendMessageToTopic() throws PulsarClientException {
 		try (PulsarClient pulsarClient = SingletonPulsarContainer.createPulsarClient()) {
 			String topicName = "test" + UUID.randomUUID();
-			Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName("sub")
-					.subscribe();
+			try (Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+					.subscriptionName("sub").subscribe()) {
 
-			ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient);
+				ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient);
 
-			ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
-					.topic(topicName).maxInflight(1).build();
-			MessageId messageId = messageSender.sendOne(MessageSpec.of("Hello world!")).block();
-			assertThat(messageId).isNotNull();
+				ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
+						.topic(topicName).maxInflight(1).build();
+				MessageId messageId = messageSender.sendOne(MessageSpec.of("Hello world!")).block();
+				assertThat(messageId).isNotNull();
 
-			Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
-			assertThat(message).isNotNull();
-			assertThat(message.getValue()).isEqualTo("Hello world!");
+				Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
+				assertThat(message).isNotNull();
+				assertThat(message.getValue()).isEqualTo("Hello world!");
+			}
 		}
 	}
 
@@ -79,19 +80,20 @@ class ReactiveMessageSenderE2ETest {
 		try (PulsarClient pulsarClient = SingletonPulsarContainer.createPulsarClient();
 				ReactiveMessageSenderCache producerCache = cacheInstance) {
 			String topicName = "test" + UUID.randomUUID();
-			Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName("sub")
-					.subscribe();
+			try (Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+					.subscriptionName("sub").subscribe()) {
 
-			ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient);
+				ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient);
 
-			ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
-					.cache(producerCache).maxInflight(1).topic(topicName).build();
-			MessageId messageId = messageSender.sendOne(MessageSpec.of("Hello world!")).block();
-			assertThat(messageId).isNotNull();
+				ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
+						.cache(producerCache).maxInflight(1).topic(topicName).build();
+				MessageId messageId = messageSender.sendOne(MessageSpec.of("Hello world!")).block();
+				assertThat(messageId).isNotNull();
 
-			Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
-			assertThat(message).isNotNull();
-			assertThat(message.getValue()).isEqualTo("Hello world!");
+				Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
+				assertThat(message).isNotNull();
+				assertThat(message.getValue()).isEqualTo("Hello world!");
+			}
 		}
 	}
 
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerBuilder.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerBuilder.java
index 7df510f..df525f9 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerBuilder.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerBuilder.java
@@ -55,7 +55,7 @@ class AdaptedReactiveMessageConsumerBuilder<T> implements ReactiveMessageConsume
 
 	@Override
 	public ReactiveMessageConsumer<T> build() {
-		return new AdaptedReactiveMessageConsumer<T>(this.reactiveConsumerAdapterFactory, this.schema,
+		return new AdaptedReactiveMessageConsumer<>(this.reactiveConsumerAdapterFactory, this.schema,
 				toImmutableSpec());
 	}
 
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ConcurrentHashMapProducerCacheProvider.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ConcurrentHashMapProducerCacheProvider.java
index 9b04dbc..beef4c4 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ConcurrentHashMapProducerCacheProvider.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ConcurrentHashMapProducerCacheProvider.java
@@ -44,10 +44,10 @@ public class ConcurrentHashMapProducerCacheProvider implements ProducerCacheProv
 	}
 
 	@Override
-	public void close() throws Exception {
+	public void close() {
 		for (CompletableFuture<Object> future : this.cache.values()) {
 			future.thenAccept((value) -> {
-				if (value != null && value instanceof AutoCloseable) {
+				if (value instanceof AutoCloseable) {
 					try {
 						((AutoCloseable) value).close();
 					}
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCache.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCache.java
index a0a46b8..c538437 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCache.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCache.java
@@ -24,15 +24,11 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.reactive.client.adapter.ProducerCacheProvider;
 import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache;
 import org.apache.pulsar.reactive.client.internal.api.PublisherTransformer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 class ProducerCache implements ReactiveMessageSenderCache {
 
-	private static final Logger log = LoggerFactory.getLogger(ProducerCache.class);
-
 	private final ProducerCacheProvider cacheProvider;
 
 	ProducerCache(ProducerCacheProvider cacheProvider) {
@@ -58,7 +54,7 @@ class ProducerCache implements ReactiveMessageSenderCache {
 		return Mono.usingWhen(this.leaseCacheEntry(cacheKey, producerMono, producerActionTransformer),
 				(producerCacheEntry) -> usingProducerAction.apply(producerCacheEntry.getProducer())
 						.as(producerCacheEntry::decorateProducerAction),
-				(producerCacheEntry) -> this.returnCacheEntry(producerCacheEntry));
+				this::returnCacheEntry);
 	}
 
 	private Mono<Object> returnCacheEntry(ProducerCacheEntry producerCacheEntry) {
@@ -77,12 +73,12 @@ class ProducerCache implements ReactiveMessageSenderCache {
 		return Flux.usingWhen(this.leaseCacheEntry(cacheKey, producerMono, producerActionTransformer),
 				(producerCacheEntry) -> usingProducerAction.apply(producerCacheEntry.getProducer())
 						.as(producerCacheEntry::decorateProducerAction),
-				(producerCacheEntry) -> this.returnCacheEntry(producerCacheEntry));
+				this::returnCacheEntry);
 	}
 
 	@Override
 	public void close() throws Exception {
-		if (this.cacheProvider instanceof AutoCloseable) {
+		if (this.cacheProvider != null) {
 			this.cacheProvider.close();
 		}
 	}
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheEntry.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheEntry.java
index 12f4ced..1f185c1 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheEntry.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheEntry.java
@@ -33,9 +33,9 @@ class ProducerCacheEntry implements AutoCloseable {
 
 	private static final Logger log = LoggerFactory.getLogger(ProducerCacheEntry.class);
 
-	private final AtomicReference<Producer<?>> producer = new AtomicReference();
+	private final AtomicReference<Producer<?>> producer = new AtomicReference<>();
 
-	private final AtomicReference<Mono<? extends Producer<?>>> producerCreator = new AtomicReference();
+	private final AtomicReference<Mono<? extends Producer<?>>> producerCreator = new AtomicReference<>();
 
 	private final AtomicInteger activeLeases = new AtomicInteger(0);
 
@@ -69,10 +69,6 @@ class ProducerCacheEntry implements AutoCloseable {
 		}
 	}
 
-	int getActiveLeases() {
-		return this.activeLeases.get();
-	}
-
 	<T> Producer<T> getProducer() {
 		return (Producer<T>) this.producer.get();
 	}
@@ -92,7 +88,7 @@ class ProducerCacheEntry implements AutoCloseable {
 					}
 				}
 			}
-			return Mono.defer(() -> this.producerCreator.get()).filter(Producer::isConnected)
+			return Mono.defer(this.producerCreator::get).filter(Producer::isConnected)
 					.repeatWhenEmpty(5, (flux) -> flux.delayElements(Duration.ofSeconds(1))).thenReturn(this);
 		});
 	}
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKey.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKey.java
index 4d3686d..5427b72 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKey.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKey.java
@@ -37,10 +37,6 @@ final class ProducerCacheKey {
 		this.schema = schema;
 	}
 
-	String getTopicName() {
-		return (this.producerConfigurationData != null) ? this.producerConfigurationData.getTopicName() : null;
-	}
-
 	@Override
 	public boolean equals(Object o) {
 		if (this == o) {
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/PulsarFutureAdapter.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/PulsarFutureAdapter.java
index 3dfb87c..747e3b8 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/PulsarFutureAdapter.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/PulsarFutureAdapter.java
@@ -27,7 +27,7 @@ import reactor.core.publisher.Mono;
  * Stateful adapter from CompletableFuture to Mono which keeps a reference to the original
  * future so that it can be cancelled. Cancellation is necessary for some cases to release
  * resources.
- *
+ * <p>
  * There's additional logic to ignore Pulsar client's
  * {@link org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException} when
  * the Mono has been cancelled. This is to reduce unnecessary exceptions in logs.
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapterFactory.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapterFactory.java
index 5f45ccd..2b1f1dc 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapterFactory.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapterFactory.java
@@ -31,7 +31,7 @@ class ReactiveConsumerAdapterFactory {
 	}
 
 	<T> ReactiveConsumerAdapter<T> create(Function<PulsarClient, ConsumerBuilder<T>> consumerBuilderFactory) {
-		return new ReactiveConsumerAdapter<T>(this.pulsarClientSupplier, consumerBuilderFactory);
+		return new ReactiveConsumerAdapter<>(this.pulsarClientSupplier, consumerBuilderFactory);
 	}
 
 }
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java
index d3e8f31..1ecacf8 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java
@@ -80,7 +80,7 @@ class ReactiveProducerAdapter<T> {
 
 	private <R> Mono<R> usingUncachedProducer(Function<Producer<T>, Mono<R>> usingProducerAction) {
 		return Mono.usingWhen(createProducerMono(),
-				(producer) -> Mono.using(() -> this.producerActionTransformer.get(),
+				(producer) -> Mono.using(this.producerActionTransformer::get,
 						(transformer) -> usingProducerAction.apply(producer)
 								.as((mono) -> Mono.from(transformer.transform(mono))),
 						Disposable::dispose),
@@ -106,7 +106,7 @@ class ReactiveProducerAdapter<T> {
 	}
 
 	private <R> Flux<R> usingUncachedProducerMany(Function<Producer<T>, Flux<R>> usingProducerAction) {
-		return Flux.usingWhen(createProducerMono(), (producer) -> Flux.using(() -> this.producerActionTransformer.get(),
+		return Flux.usingWhen(createProducerMono(), (producer) -> Flux.using(this.producerActionTransformer::get,
 				(transformer) -> usingProducerAction.apply(producer).as(transformer::transform), Disposable::dispose),
 				this::closeProducer);
 	}
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactivePulsarResourceAdapter.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactivePulsarResourceAdapter.java
index f3c1f5f..43b7522 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactivePulsarResourceAdapter.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactivePulsarResourceAdapter.java
@@ -42,14 +42,6 @@ class ReactivePulsarResourceAdapter {
 		};
 	}
 
-	static ReactivePulsarResourceAdapter create(Supplier<PulsarClient> pulsarClientSupplier) {
-		return new ReactivePulsarResourceAdapter(pulsarClientSupplier);
-	}
-
-	static ReactivePulsarResourceAdapter create(PulsarClient pulsarClient) {
-		return create(() -> pulsarClient);
-	}
-
 	ReactiveProducerAdapterFactory producer() {
 		return new ReactiveProducerAdapterFactory(this.pulsarClientSupplier);
 	}
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveReaderAdapterFactory.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveReaderAdapterFactory.java
index c9dd6d1..1eb3a3a 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveReaderAdapterFactory.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveReaderAdapterFactory.java
@@ -31,7 +31,7 @@ class ReactiveReaderAdapterFactory {
 	}
 
 	<T> ReactiveReaderAdapter<T> create(Function<PulsarClient, ReaderBuilder<T>> readerBuilderFactory) {
-		return new ReactiveReaderAdapter<T>(this.pulsarClientSupplier, readerBuilderFactory);
+		return new ReactiveReaderAdapter<>(this.pulsarClientSupplier, readerBuilderFactory);
 	}
 
 }
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpecBuilder.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpecBuilder.java
index d49fb11..fff4967 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpecBuilder.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpecBuilder.java
@@ -127,7 +127,7 @@ public interface MessageSpecBuilder<T> {
 	 *
 	 * <p>
 	 * The timestamp is milliseconds and based on UTC (eg:
-	 * {@link System#currentTimeMillis()}.
+	 * {@link System#currentTimeMillis()}).
 	 *
 	 * <p>
 	 * <b>Note</b>: messages are only delivered with delay when a consumer is consuming
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
index 7e9f960..07b1d22 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
@@ -292,7 +292,7 @@ public interface ReactiveMessageConsumerBuilder<T> {
 	 * Sets the consumer name.
 	 *
 	 * <p>
-	 * Consumer name is informative and it can be used to indentify a particular consumer
+	 * Consumer name is informative and it can be used to identify a particular consumer
 	 * instance from the topic stats.
 	 * @param consumerName the consumer name
 	 * @return the consumer builder instance
@@ -341,7 +341,7 @@ public interface ReactiveMessageConsumerBuilder<T> {
 
 	/**
 	 * Sets the priority level for the consumer.
-	 *
+	 * <p>
 	 * <b>Shared subscription</b> Sets the priority level for the shared subscription
 	 * consumers to which the broker gives more priority while dispatching messages. Here,
 	 * the broker follows descending priorities. (eg: 0=max-priority, 1, 2,..)
@@ -430,13 +430,13 @@ public interface ReactiveMessageConsumerBuilder<T> {
 	 * The timeout needs to be greater than 1 second.
 	 *
 	 * <p>
-	 * By default, the acknowledge timeout is disabled and that means that messages
+	 * By default, the acknowledgement timeout is disabled and that means that messages
 	 * delivered to a consumer will not be re-delivered unless the consumer crashes.
 	 *
 	 * <p>
-	 * When enabling the acknowledge timeout, if a message is not acknowledged within the
-	 * specified timeout it will be re-delivered to the consumer (possibly to a different
-	 * consumer in case of a shared subscription).
+	 * When enabling the acknowledgement timeout, if a message is not acknowledged within
+	 * the specified timeout it will be re-delivered to the consumer (possibly to a
+	 * different consumer in case of a shared subscription).
 	 * @param ackTimeout the timeout for unacknowledged messages.
 	 * @return the consumer builder instance
 	 * @see ConsumerBuilder#ackTimeout(long, TimeUnit)
@@ -450,10 +450,10 @@ public interface ReactiveMessageConsumerBuilder<T> {
 	 * Sets the granularity of the ack-timeout redelivery.
 	 *
 	 * <p>
-	 * By default, the tick time is set to 1 second. Using an higher tick time will reduce
+	 * By default, the tick time is set to 1 second. Using a higher tick time will reduce
 	 * the memory overhead to track messages when the ack-timeout is set to bigger values
 	 * (eg: 1hour).
-	 * @param ackTimeoutTickTime the minimum precision for the acknowledge timeout
+	 * @param ackTimeoutTickTime the minimum precision for the acknowledgement timeout
 	 * messages tracker
 	 * @return the consumer builder instance
 	 * @see ConsumerBuilder#ackTimeoutTickTime(long, TimeUnit)
@@ -528,7 +528,7 @@ public interface ReactiveMessageConsumerBuilder<T> {
 	 * Sets a dead letter policy for the consumer.
 	 *
 	 * <p>
-	 * By default messages are redelivered indefinitely if they are not acknowledged. By
+	 * By default, messages are redelivered indefinitely if they are not acknowledged. By
 	 * using a dead letter mechanism, messages that have reached the max redelivery count
 	 * will be acknowledged automatically and send to the configured dead letter topic.
 	 *
@@ -539,7 +539,7 @@ public interface ReactiveMessageConsumerBuilder<T> {
 	 *          .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10).build())
 	 *          .build();
 	 * </pre> Default the dead letter topic name is {TopicName}-{Subscription}-DLQ. You
-	 * can set o set a custom dead letter topic name like this: <pre>
+	 * can set a custom dead letter topic name like this: <pre>
 	 * client.messageConsumer(Schema.BYTES)
 	 *          .deadLetterPolicy(DeadLetterPolicy
 	 *              .builder()
@@ -604,7 +604,7 @@ public interface ReactiveMessageConsumerBuilder<T> {
 	}
 
 	/**
-	 * Sets the maximum total receiver queue size across partitons.
+	 * Sets the maximum total receiver queue size across partitions.
 	 *
 	 * <p>
 	 * This setting is used to reduce the receiver queue size for individual partitions
@@ -685,8 +685,8 @@ public interface ReactiveMessageConsumerBuilder<T> {
 	 * </pre> Buffering large number of outstanding uncompleted chunked messages can
 	 * create memory pressure. It can be guarded by providing a
 	 * {@code maxPendingChunkedMessage} threshold. Once the consumer reaches this
-	 * threshold, it drops the outstanding unchunked-messages by silently acknowledging or
-	 * asking the broker to redeliver later by marking it unacknowledged. This behavior
+	 * threshold, it drops the outstanding non-chunked messages by silently acknowledging
+	 * or asking the broker to redeliver later by marking it unacknowledged. This behavior
 	 * can be controlled by setting {@link #autoAckOldestChunkedMessageOnQueueFull} The
 	 * default value is 10.
 	 * @param maxPendingChunkedMessage the maximum pending chunked messages.
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactivePulsarClient.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactivePulsarClient.java
index 9a9d248..882a0ed 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactivePulsarClient.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactivePulsarClient.java
@@ -19,8 +19,8 @@ package org.apache.pulsar.reactive.client.api;
 import org.apache.pulsar.client.api.Schema;
 
 /**
- * Apache Pulsar Reactive Client interface
- *
+ * Apache Pulsar Reactive Client interface.
+ * <p>
  * Contains methods to create builders for {@link ReactiveMessageSender},
  * {@link ReactiveMessageReader} and {@link ReactiveMessageConsumer} instances.
  */
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/ApiImplementationFactory.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/ApiImplementationFactory.java
index 95285b2..38a3ac3 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/ApiImplementationFactory.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/ApiImplementationFactory.java
@@ -53,7 +53,7 @@ public final class ApiImplementationFactory {
 	 * @return the result of the message processing
 	 */
 	public static <T> MessageResult<T> negativeAcknowledge(MessageId messageId, T value) {
-		return new DefaultMessageResult<T>(messageId, false, value);
+		return new DefaultMessageResult<>(messageId, false, value);
 	}
 
 	/**
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpecBuilder.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpecBuilder.java
index 7910116..eb19e99 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpecBuilder.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpecBuilder.java
@@ -139,7 +139,7 @@ class DefaultMessageSpecBuilder<T> implements MessageSpecBuilder<T> {
 
 	@Override
 	public MessageSpec<T> build() {
-		return new DefaultMessageSpec<T>(this.key, this.orderingKey, this.keyBytes, this.value, this.properties,
+		return new DefaultMessageSpec<>(this.key, this.orderingKey, this.keyBytes, this.value, this.properties,
 				this.eventTime, this.sequenceId, this.replicationClusters, this.disableReplication, this.deliverAt,
 				this.deliverAfterDelay, this.deliverAfterUnit);
 	}
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/GroupOrderedMessageProcessors.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/GroupOrderedMessageProcessors.java
index 5e30171..638b116 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/GroupOrderedMessageProcessors.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/GroupOrderedMessageProcessors.java
@@ -29,7 +29,7 @@ import reactor.util.concurrent.Queues;
 /**
  * Functions for implementing In-order parallel processing for Pulsar messages using
  * Project Reactor.
- *
+ * <p>
  * A processing group is resolved for each message based on the message's key. The message
  * flux is split into group fluxes based on the processing group. Each group flux is
  * processes messages in order (one-by-one). Multiple group fluxes are processed in
diff --git a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/MessageSpecTest.java b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/MessageSpecTest.java
index 206a6f3..b7e35d7 100644
--- a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/MessageSpecTest.java
+++ b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/MessageSpecTest.java
@@ -24,7 +24,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.reactive.client.internal.api.InternalMessageSpec;
 import org.junit.jupiter.api.Test;
@@ -101,7 +100,7 @@ class MessageSpecTest {
 		private Duration deliverAfter;
 
 		@Override
-		public MessageId send() throws PulsarClientException {
+		public MessageId send() {
 			throw new IllegalStateException("not implemented");
 		}