You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/12/02 15:38:45 UTC
[pulsar-client-reactive] branch main updated: Some cleanups (#79)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push:
new e2e06c0 Some cleanups (#79)
e2e06c0 is described below
commit e2e06c049653a975132e65a33b771c10621d5d0f
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Fri Dec 2 16:38:41 2022 +0100
Some cleanups (#79)
---
.../adapter/ReactiveMessagePipelineE2ETest.java | 6 ++--
.../adapter/ReactiveMessageSenderE2ETest.java | 42 +++++++++++-----------
.../AdaptedReactiveMessageConsumerBuilder.java | 2 +-
.../ConcurrentHashMapProducerCacheProvider.java | 4 +--
.../client/internal/adapter/ProducerCache.java | 10 ++----
.../internal/adapter/ProducerCacheEntry.java | 10 ++----
.../client/internal/adapter/ProducerCacheKey.java | 4 ---
.../internal/adapter/PulsarFutureAdapter.java | 2 +-
.../adapter/ReactiveConsumerAdapterFactory.java | 2 +-
.../internal/adapter/ReactiveProducerAdapter.java | 4 +--
.../adapter/ReactivePulsarResourceAdapter.java | 8 -----
.../adapter/ReactiveReaderAdapterFactory.java | 2 +-
.../reactive/client/api/MessageSpecBuilder.java | 2 +-
.../client/api/ReactiveMessageConsumerBuilder.java | 26 +++++++-------
.../reactive/client/api/ReactivePulsarClient.java | 4 +--
.../internal/api/ApiImplementationFactory.java | 2 +-
.../internal/api/DefaultMessageSpecBuilder.java | 2 +-
.../api/GroupOrderedMessageProcessors.java | 2 +-
.../reactive/client/api/MessageSpecTest.java | 3 +-
19 files changed, 59 insertions(+), 78 deletions(-)
diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
index 25dd1f1..3914961 100644
--- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
+++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
@@ -73,13 +73,13 @@ class ReactiveMessagePipelineE2ETest {
List<String> messages = Collections.synchronizedList(new ArrayList<>());
CountDownLatch latch = new CountDownLatch(100);
- try (ReactiveMessagePipeline reactiveMessagePipeline = reactivePulsarClient.messageConsumer(Schema.STRING)
+ try (ReactiveMessagePipeline ignored = reactivePulsarClient.messageConsumer(Schema.STRING)
.subscriptionName("sub").topic(topicName).build().messagePipeline()
.messageHandler((message) -> Mono.fromRunnable(() -> {
messages.add(message.getValue());
latch.countDown();
})).build().start()) {
- latch.await(5, TimeUnit.SECONDS);
+ assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(messages).isEqualTo(Flux.range(1, 100).map(Object::toString).collectList().block());
}
}
@@ -133,7 +133,7 @@ class ReactiveMessagePipelineE2ETest {
if (messageOrderScenario != MessageOrderScenario.NO_PARALLEL) {
reactiveMessageHandlerBuilder.concurrency(KEYS_COUNT).useKeyOrderedProcessing();
}
- try (ReactiveMessagePipeline reactiveMessagePipeline = reactiveMessageHandlerBuilder.build().start()) {
+ try (ReactiveMessagePipeline ignored = reactiveMessageHandlerBuilder.build().start()) {
boolean latchCompleted = latch.await(5, TimeUnit.SECONDS);
assertThat(latchCompleted).as("processing of all messages should have completed").isTrue();
for (int i = 1; i <= KEYS_COUNT; i++) {
diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
index 9a43150..bc40351 100644
--- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
+++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
@@ -56,19 +56,20 @@ class ReactiveMessageSenderE2ETest {
void shouldSendMessageToTopic() throws PulsarClientException {
try (PulsarClient pulsarClient = SingletonPulsarContainer.createPulsarClient()) {
String topicName = "test" + UUID.randomUUID();
- Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName("sub")
- .subscribe();
+ try (Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+ .subscriptionName("sub").subscribe()) {
- ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient);
+ ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient);
- ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
- .topic(topicName).maxInflight(1).build();
- MessageId messageId = messageSender.sendOne(MessageSpec.of("Hello world!")).block();
- assertThat(messageId).isNotNull();
+ ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
+ .topic(topicName).maxInflight(1).build();
+ MessageId messageId = messageSender.sendOne(MessageSpec.of("Hello world!")).block();
+ assertThat(messageId).isNotNull();
- Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
- assertThat(message).isNotNull();
- assertThat(message.getValue()).isEqualTo("Hello world!");
+ Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
+ assertThat(message).isNotNull();
+ assertThat(message.getValue()).isEqualTo("Hello world!");
+ }
}
}
@@ -79,19 +80,20 @@ class ReactiveMessageSenderE2ETest {
try (PulsarClient pulsarClient = SingletonPulsarContainer.createPulsarClient();
ReactiveMessageSenderCache producerCache = cacheInstance) {
String topicName = "test" + UUID.randomUUID();
- Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName("sub")
- .subscribe();
+ try (Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+ .subscriptionName("sub").subscribe()) {
- ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient);
+ ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient);
- ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
- .cache(producerCache).maxInflight(1).topic(topicName).build();
- MessageId messageId = messageSender.sendOne(MessageSpec.of("Hello world!")).block();
- assertThat(messageId).isNotNull();
+ ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
+ .cache(producerCache).maxInflight(1).topic(topicName).build();
+ MessageId messageId = messageSender.sendOne(MessageSpec.of("Hello world!")).block();
+ assertThat(messageId).isNotNull();
- Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
- assertThat(message).isNotNull();
- assertThat(message.getValue()).isEqualTo("Hello world!");
+ Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
+ assertThat(message).isNotNull();
+ assertThat(message.getValue()).isEqualTo("Hello world!");
+ }
}
}
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerBuilder.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerBuilder.java
index 7df510f..df525f9 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerBuilder.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerBuilder.java
@@ -55,7 +55,7 @@ class AdaptedReactiveMessageConsumerBuilder<T> implements ReactiveMessageConsume
@Override
public ReactiveMessageConsumer<T> build() {
- return new AdaptedReactiveMessageConsumer<T>(this.reactiveConsumerAdapterFactory, this.schema,
+ return new AdaptedReactiveMessageConsumer<>(this.reactiveConsumerAdapterFactory, this.schema,
toImmutableSpec());
}
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ConcurrentHashMapProducerCacheProvider.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ConcurrentHashMapProducerCacheProvider.java
index 9b04dbc..beef4c4 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ConcurrentHashMapProducerCacheProvider.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ConcurrentHashMapProducerCacheProvider.java
@@ -44,10 +44,10 @@ public class ConcurrentHashMapProducerCacheProvider implements ProducerCacheProv
}
@Override
- public void close() throws Exception {
+ public void close() {
for (CompletableFuture<Object> future : this.cache.values()) {
future.thenAccept((value) -> {
- if (value != null && value instanceof AutoCloseable) {
+ if (value instanceof AutoCloseable) {
try {
((AutoCloseable) value).close();
}
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCache.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCache.java
index a0a46b8..c538437 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCache.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCache.java
@@ -24,15 +24,11 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.reactive.client.adapter.ProducerCacheProvider;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache;
import org.apache.pulsar.reactive.client.internal.api.PublisherTransformer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
class ProducerCache implements ReactiveMessageSenderCache {
- private static final Logger log = LoggerFactory.getLogger(ProducerCache.class);
-
private final ProducerCacheProvider cacheProvider;
ProducerCache(ProducerCacheProvider cacheProvider) {
@@ -58,7 +54,7 @@ class ProducerCache implements ReactiveMessageSenderCache {
return Mono.usingWhen(this.leaseCacheEntry(cacheKey, producerMono, producerActionTransformer),
(producerCacheEntry) -> usingProducerAction.apply(producerCacheEntry.getProducer())
.as(producerCacheEntry::decorateProducerAction),
- (producerCacheEntry) -> this.returnCacheEntry(producerCacheEntry));
+ this::returnCacheEntry);
}
private Mono<Object> returnCacheEntry(ProducerCacheEntry producerCacheEntry) {
@@ -77,12 +73,12 @@ class ProducerCache implements ReactiveMessageSenderCache {
return Flux.usingWhen(this.leaseCacheEntry(cacheKey, producerMono, producerActionTransformer),
(producerCacheEntry) -> usingProducerAction.apply(producerCacheEntry.getProducer())
.as(producerCacheEntry::decorateProducerAction),
- (producerCacheEntry) -> this.returnCacheEntry(producerCacheEntry));
+ this::returnCacheEntry);
}
@Override
public void close() throws Exception {
- if (this.cacheProvider instanceof AutoCloseable) {
+ if (this.cacheProvider != null) {
this.cacheProvider.close();
}
}
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheEntry.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheEntry.java
index 12f4ced..1f185c1 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheEntry.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheEntry.java
@@ -33,9 +33,9 @@ class ProducerCacheEntry implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(ProducerCacheEntry.class);
- private final AtomicReference<Producer<?>> producer = new AtomicReference();
+ private final AtomicReference<Producer<?>> producer = new AtomicReference<>();
- private final AtomicReference<Mono<? extends Producer<?>>> producerCreator = new AtomicReference();
+ private final AtomicReference<Mono<? extends Producer<?>>> producerCreator = new AtomicReference<>();
private final AtomicInteger activeLeases = new AtomicInteger(0);
@@ -69,10 +69,6 @@ class ProducerCacheEntry implements AutoCloseable {
}
}
- int getActiveLeases() {
- return this.activeLeases.get();
- }
-
<T> Producer<T> getProducer() {
return (Producer<T>) this.producer.get();
}
@@ -92,7 +88,7 @@ class ProducerCacheEntry implements AutoCloseable {
}
}
}
- return Mono.defer(() -> this.producerCreator.get()).filter(Producer::isConnected)
+ return Mono.defer(this.producerCreator::get).filter(Producer::isConnected)
.repeatWhenEmpty(5, (flux) -> flux.delayElements(Duration.ofSeconds(1))).thenReturn(this);
});
}
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKey.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKey.java
index 4d3686d..5427b72 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKey.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKey.java
@@ -37,10 +37,6 @@ final class ProducerCacheKey {
this.schema = schema;
}
- String getTopicName() {
- return (this.producerConfigurationData != null) ? this.producerConfigurationData.getTopicName() : null;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/PulsarFutureAdapter.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/PulsarFutureAdapter.java
index 3dfb87c..747e3b8 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/PulsarFutureAdapter.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/PulsarFutureAdapter.java
@@ -27,7 +27,7 @@ import reactor.core.publisher.Mono;
* Stateful adapter from CompletableFuture to Mono which keeps a reference to the original
* future so that it can be cancelled. Cancellation is necessary for some cases to release
* resources.
- *
+ * <p>
* There's additional logic to ignore Pulsar client's
* {@link org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException} when
* the Mono has been cancelled. This is to reduce unnecessary exceptions in logs.
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapterFactory.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapterFactory.java
index 5f45ccd..2b1f1dc 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapterFactory.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapterFactory.java
@@ -31,7 +31,7 @@ class ReactiveConsumerAdapterFactory {
}
<T> ReactiveConsumerAdapter<T> create(Function<PulsarClient, ConsumerBuilder<T>> consumerBuilderFactory) {
- return new ReactiveConsumerAdapter<T>(this.pulsarClientSupplier, consumerBuilderFactory);
+ return new ReactiveConsumerAdapter<>(this.pulsarClientSupplier, consumerBuilderFactory);
}
}
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java
index d3e8f31..1ecacf8 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java
@@ -80,7 +80,7 @@ class ReactiveProducerAdapter<T> {
private <R> Mono<R> usingUncachedProducer(Function<Producer<T>, Mono<R>> usingProducerAction) {
return Mono.usingWhen(createProducerMono(),
- (producer) -> Mono.using(() -> this.producerActionTransformer.get(),
+ (producer) -> Mono.using(this.producerActionTransformer::get,
(transformer) -> usingProducerAction.apply(producer)
.as((mono) -> Mono.from(transformer.transform(mono))),
Disposable::dispose),
@@ -106,7 +106,7 @@ class ReactiveProducerAdapter<T> {
}
private <R> Flux<R> usingUncachedProducerMany(Function<Producer<T>, Flux<R>> usingProducerAction) {
- return Flux.usingWhen(createProducerMono(), (producer) -> Flux.using(() -> this.producerActionTransformer.get(),
+ return Flux.usingWhen(createProducerMono(), (producer) -> Flux.using(this.producerActionTransformer::get,
(transformer) -> usingProducerAction.apply(producer).as(transformer::transform), Disposable::dispose),
this::closeProducer);
}
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactivePulsarResourceAdapter.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactivePulsarResourceAdapter.java
index f3c1f5f..43b7522 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactivePulsarResourceAdapter.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactivePulsarResourceAdapter.java
@@ -42,14 +42,6 @@ class ReactivePulsarResourceAdapter {
};
}
- static ReactivePulsarResourceAdapter create(Supplier<PulsarClient> pulsarClientSupplier) {
- return new ReactivePulsarResourceAdapter(pulsarClientSupplier);
- }
-
- static ReactivePulsarResourceAdapter create(PulsarClient pulsarClient) {
- return create(() -> pulsarClient);
- }
-
ReactiveProducerAdapterFactory producer() {
return new ReactiveProducerAdapterFactory(this.pulsarClientSupplier);
}
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveReaderAdapterFactory.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveReaderAdapterFactory.java
index c9dd6d1..1eb3a3a 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveReaderAdapterFactory.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveReaderAdapterFactory.java
@@ -31,7 +31,7 @@ class ReactiveReaderAdapterFactory {
}
<T> ReactiveReaderAdapter<T> create(Function<PulsarClient, ReaderBuilder<T>> readerBuilderFactory) {
- return new ReactiveReaderAdapter<T>(this.pulsarClientSupplier, readerBuilderFactory);
+ return new ReactiveReaderAdapter<>(this.pulsarClientSupplier, readerBuilderFactory);
}
}
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpecBuilder.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpecBuilder.java
index d49fb11..fff4967 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpecBuilder.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageSpecBuilder.java
@@ -127,7 +127,7 @@ public interface MessageSpecBuilder<T> {
*
* <p>
* The timestamp is milliseconds and based on UTC (eg:
- * {@link System#currentTimeMillis()}.
+ * {@link System#currentTimeMillis()}).
*
* <p>
* <b>Note</b>: messages are only delivered with delay when a consumer is consuming
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
index 7e9f960..07b1d22 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
@@ -292,7 +292,7 @@ public interface ReactiveMessageConsumerBuilder<T> {
* Sets the consumer name.
*
* <p>
- * Consumer name is informative and it can be used to indentify a particular consumer
+ * Consumer name is informative and it can be used to identify a particular consumer
* instance from the topic stats.
* @param consumerName the consumer name
* @return the consumer builder instance
@@ -341,7 +341,7 @@ public interface ReactiveMessageConsumerBuilder<T> {
/**
* Sets the priority level for the consumer.
- *
+ * <p>
* <b>Shared subscription</b> Sets the priority level for the shared subscription
* consumers to which the broker gives more priority while dispatching messages. Here,
* the broker follows descending priorities. (eg: 0=max-priority, 1, 2,..)
@@ -430,13 +430,13 @@ public interface ReactiveMessageConsumerBuilder<T> {
* The timeout needs to be greater than 1 second.
*
* <p>
- * By default, the acknowledge timeout is disabled and that means that messages
+ * By default, the acknowledgement timeout is disabled and that means that messages
* delivered to a consumer will not be re-delivered unless the consumer crashes.
*
* <p>
- * When enabling the acknowledge timeout, if a message is not acknowledged within the
- * specified timeout it will be re-delivered to the consumer (possibly to a different
- * consumer in case of a shared subscription).
+ * When enabling the acknowledgement timeout, if a message is not acknowledged within
+ * the specified timeout it will be re-delivered to the consumer (possibly to a
+ * different consumer in case of a shared subscription).
* @param ackTimeout the timeout for unacknowledged messages.
* @return the consumer builder instance
* @see ConsumerBuilder#ackTimeout(long, TimeUnit)
@@ -450,10 +450,10 @@ public interface ReactiveMessageConsumerBuilder<T> {
* Sets the granularity of the ack-timeout redelivery.
*
* <p>
- * By default, the tick time is set to 1 second. Using an higher tick time will reduce
+ * By default, the tick time is set to 1 second. Using a higher tick time will reduce
* the memory overhead to track messages when the ack-timeout is set to bigger values
* (eg: 1hour).
- * @param ackTimeoutTickTime the minimum precision for the acknowledge timeout
+ * @param ackTimeoutTickTime the minimum precision for the acknowledgement timeout
* messages tracker
* @return the consumer builder instance
* @see ConsumerBuilder#ackTimeoutTickTime(long, TimeUnit)
@@ -528,7 +528,7 @@ public interface ReactiveMessageConsumerBuilder<T> {
* Sets a dead letter policy for the consumer.
*
* <p>
- * By default messages are redelivered indefinitely if they are not acknowledged. By
+ * By default, messages are redelivered indefinitely if they are not acknowledged. By
* using a dead letter mechanism, messages that have reached the max redelivery count
* will be acknowledged automatically and send to the configured dead letter topic.
*
@@ -539,7 +539,7 @@ public interface ReactiveMessageConsumerBuilder<T> {
* .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10).build())
* .build();
* </pre> Default the dead letter topic name is {TopicName}-{Subscription}-DLQ. You
- * can set o set a custom dead letter topic name like this: <pre>
+ * can set a custom dead letter topic name like this: <pre>
* client.messageConsumer(Schema.BYTES)
* .deadLetterPolicy(DeadLetterPolicy
* .builder()
@@ -604,7 +604,7 @@ public interface ReactiveMessageConsumerBuilder<T> {
}
/**
- * Sets the maximum total receiver queue size across partitons.
+ * Sets the maximum total receiver queue size across partitions.
*
* <p>
* This setting is used to reduce the receiver queue size for individual partitions
@@ -685,8 +685,8 @@ public interface ReactiveMessageConsumerBuilder<T> {
* </pre> Buffering large number of outstanding uncompleted chunked messages can
* create memory pressure. It can be guarded by providing a
* {@code maxPendingChunkedMessage} threshold. Once the consumer reaches this
- * threshold, it drops the outstanding unchunked-messages by silently acknowledging or
- * asking the broker to redeliver later by marking it unacknowledged. This behavior
+ * threshold, it drops the outstanding non-chunked messages by silently acknowledging
+ * or asking the broker to redeliver later by marking it unacknowledged. This behavior
* can be controlled by setting {@link #autoAckOldestChunkedMessageOnQueueFull} The
* default value is 10.
* @param maxPendingChunkedMessage the maximum pending chunked messages.
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactivePulsarClient.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactivePulsarClient.java
index 9a9d248..882a0ed 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactivePulsarClient.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactivePulsarClient.java
@@ -19,8 +19,8 @@ package org.apache.pulsar.reactive.client.api;
import org.apache.pulsar.client.api.Schema;
/**
- * Apache Pulsar Reactive Client interface
- *
+ * Apache Pulsar Reactive Client interface.
+ * <p>
* Contains methods to create builders for {@link ReactiveMessageSender},
* {@link ReactiveMessageReader} and {@link ReactiveMessageConsumer} instances.
*/
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/ApiImplementationFactory.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/ApiImplementationFactory.java
index 95285b2..38a3ac3 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/ApiImplementationFactory.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/ApiImplementationFactory.java
@@ -53,7 +53,7 @@ public final class ApiImplementationFactory {
* @return the result of the message processing
*/
public static <T> MessageResult<T> negativeAcknowledge(MessageId messageId, T value) {
- return new DefaultMessageResult<T>(messageId, false, value);
+ return new DefaultMessageResult<>(messageId, false, value);
}
/**
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpecBuilder.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpecBuilder.java
index 7910116..eb19e99 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpecBuilder.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultMessageSpecBuilder.java
@@ -139,7 +139,7 @@ class DefaultMessageSpecBuilder<T> implements MessageSpecBuilder<T> {
@Override
public MessageSpec<T> build() {
- return new DefaultMessageSpec<T>(this.key, this.orderingKey, this.keyBytes, this.value, this.properties,
+ return new DefaultMessageSpec<>(this.key, this.orderingKey, this.keyBytes, this.value, this.properties,
this.eventTime, this.sequenceId, this.replicationClusters, this.disableReplication, this.deliverAt,
this.deliverAfterDelay, this.deliverAfterUnit);
}
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/GroupOrderedMessageProcessors.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/GroupOrderedMessageProcessors.java
index 5e30171..638b116 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/GroupOrderedMessageProcessors.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/GroupOrderedMessageProcessors.java
@@ -29,7 +29,7 @@ import reactor.util.concurrent.Queues;
/**
* Functions for implementing In-order parallel processing for Pulsar messages using
* Project Reactor.
- *
+ * <p>
* A processing group is resolved for each message based on the message's key. The message
* flux is split into group fluxes based on the processing group. Each group flux is
* processes messages in order (one-by-one). Multiple group fluxes are processed in
diff --git a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/MessageSpecTest.java b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/MessageSpecTest.java
index 206a6f3..b7e35d7 100644
--- a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/MessageSpecTest.java
+++ b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/MessageSpecTest.java
@@ -24,7 +24,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.reactive.client.internal.api.InternalMessageSpec;
import org.junit.jupiter.api.Test;
@@ -101,7 +100,7 @@ class MessageSpecTest {
private Duration deliverAfter;
@Override
- public MessageId send() throws PulsarClientException {
+ public MessageId send() {
throw new IllegalStateException("not implemented");
}