You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by cb...@apache.org on 2022/12/03 21:39:03 UTC
[pulsar-client-reactive] branch main updated: Add InflightLimiter parameters to producer cache key (#82)
This is an automated email from the ASF dual-hosted git repository.
cbornet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push:
new 2c95893 Add InflightLimiter parameters to producer cache key (#82)
2c95893 is described below
commit 2c95893879c37b7edb1c00be04a7c41ca170238f
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Sat Dec 3 23:38:59 2022 +0200
Add InflightLimiter parameters to producer cache key (#82)
Fixes #80
---
.../adapter/AdaptedReactiveMessageSender.java | 7 ++--
.../AdaptedReactiveMessageSenderBuilder.java | 42 ++++++++++++++++++++--
.../client/internal/adapter/ProducerCacheKey.java | 11 ++++--
.../internal/adapter/ReactiveProducerAdapter.java | 7 ++--
.../adapter/ReactiveProducerAdapterFactory.java | 6 ++--
5 files changed, 62 insertions(+), 11 deletions(-)
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
index 0471b7c..7ed0062 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
@@ -49,15 +49,18 @@ class AdaptedReactiveMessageSender<T> implements ReactiveMessageSender<T> {
private final Supplier<PublisherTransformer> producerActionTransformer;
+ private final Object producerActionTransformerKey;
+
AdaptedReactiveMessageSender(Schema<T> schema, ReactiveMessageSenderSpec senderSpec, int maxConcurrency,
ReactiveProducerAdapterFactory reactiveProducerAdapterFactory, ProducerCache producerCache,
- Supplier<PublisherTransformer> producerActionTransformer) {
+ Supplier<PublisherTransformer> producerActionTransformer, Object producerActionTransformerKey) {
this.schema = schema;
this.senderSpec = senderSpec;
this.maxConcurrency = maxConcurrency;
this.reactiveProducerAdapterFactory = reactiveProducerAdapterFactory;
this.producerCache = producerCache;
this.producerActionTransformer = producerActionTransformer;
+ this.producerActionTransformerKey = producerActionTransformerKey;
}
ReactiveProducerAdapter<T> createReactiveProducerAdapter() {
@@ -65,7 +68,7 @@ class AdaptedReactiveMessageSender<T> implements ReactiveMessageSender<T> {
ProducerBuilder<T> producerBuilder = pulsarClient.newProducer(this.schema);
configureProducerBuilder(producerBuilder);
return producerBuilder;
- }, this.producerCache, this.producerActionTransformer);
+ }, this.producerCache, this.producerActionTransformer, this.producerActionTransformerKey);
}
private void configureProducerBuilder(ProducerBuilder<T> producerBuilder) {
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java
index 29c41af..18fb53a 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java
@@ -16,6 +16,7 @@
package org.apache.pulsar.reactive.client.internal.adapter;
+import java.util.Objects;
import java.util.function.Supplier;
import org.apache.pulsar.client.api.Schema;
@@ -96,17 +97,54 @@ class AdaptedReactiveMessageSenderBuilder<T> implements ReactiveMessageSenderBui
@Override
public ReactiveMessageSender<T> build() {
+ Object producerActionTransformerKey;
if (this.maxInflight > 0) {
this.producerActionTransformer = () -> new InflightLimiter(this.maxInflight,
Math.max(this.maxInflight / 2, 1), Schedulers.single(), this.maxConcurrentSenderSubscriptions);
+ producerActionTransformerKey = new ProducerActionTransformerKey(this.maxInflight,
+ this.maxConcurrentSenderSubscriptions);
+ }
+ else {
+ producerActionTransformerKey = null;
}
return new AdaptedReactiveMessageSender<>(this.schema, this.senderSpec, resolveMaxConcurrency(),
- this.reactiveProducerAdapterFactory, (ProducerCache) this.producerCache,
- this.producerActionTransformer);
+ this.reactiveProducerAdapterFactory, (ProducerCache) this.producerCache, this.producerActionTransformer,
+ producerActionTransformerKey);
}
private int resolveMaxConcurrency() {
return Math.min(Math.max(MAX_CONCURRENCY_LOWER_BOUND, this.maxInflight / 2), MAX_CONCURRENCY_UPPER_BOUND);
}
+ private static class ProducerActionTransformerKey {
+
+ private final int maxInflight;
+
+ private final int maxConcurrentSenderSubscriptions;
+
+ ProducerActionTransformerKey(int maxInflight, int maxConcurrentSenderSubscriptions) {
+ this.maxInflight = maxInflight;
+ this.maxConcurrentSenderSubscriptions = maxConcurrentSenderSubscriptions;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ProducerActionTransformerKey that = (ProducerActionTransformerKey) o;
+ return this.maxInflight == that.maxInflight
+ && this.maxConcurrentSenderSubscriptions == that.maxConcurrentSenderSubscriptions;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(this.maxInflight, this.maxConcurrentSenderSubscriptions);
+ }
+
+ }
+
}
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKey.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKey.java
index 5427b72..a36ebbb 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKey.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKey.java
@@ -30,11 +30,14 @@ final class ProducerCacheKey {
private final Schema<?> schema;
+ private final Object producerActionTransformerKey;
+
ProducerCacheKey(final PulsarClient pulsarClient, final ProducerConfigurationData producerConfigurationData,
- final Schema<?> schema) {
+ final Schema<?> schema, Object producerActionTransformerKey) {
this.pulsarClient = pulsarClient;
this.producerConfigurationData = producerConfigurationData;
this.schema = schema;
+ this.producerActionTransformerKey = producerActionTransformerKey;
}
@Override
@@ -48,12 +51,14 @@ final class ProducerCacheKey {
ProducerCacheKey that = (ProducerCacheKey) o;
return (Objects.equals(this.pulsarClient, that.pulsarClient)
&& Objects.equals(this.producerConfigurationData, that.producerConfigurationData)
- && Objects.equals(this.schema, that.schema));
+ && Objects.equals(this.schema, that.schema))
+ && Objects.equals(this.producerActionTransformerKey, that.producerActionTransformerKey);
}
@Override
public int hashCode() {
- return Objects.hash(this.pulsarClient, this.producerConfigurationData, this.schema);
+ return Objects.hash(this.pulsarClient, this.producerConfigurationData, this.schema,
+ this.producerActionTransformerKey);
}
}
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java
index 1ecacf8..a99ff9c 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java
@@ -40,13 +40,16 @@ class ReactiveProducerAdapter<T> {
private final Supplier<PublisherTransformer> producerActionTransformer;
+ private final Object producerActionTransformerKey;
+
ReactiveProducerAdapter(Supplier<PulsarClient> pulsarClientSupplier,
Function<PulsarClient, ProducerBuilder<T>> producerBuilderFactory, ProducerCache producerCache,
- Supplier<PublisherTransformer> producerActionTransformer) {
+ Supplier<PublisherTransformer> producerActionTransformer, Object producerActionTransformerKey) {
this.pulsarClientSupplier = pulsarClientSupplier;
this.producerBuilderFactory = producerBuilderFactory;
this.producerCache = producerCache;
this.producerActionTransformer = producerActionTransformer;
+ this.producerActionTransformerKey = producerActionTransformerKey;
}
private Mono<Producer<T>> createProducerMono() {
@@ -60,7 +63,7 @@ class ReactiveProducerAdapter<T> {
ProducerBuilderImpl<T> producerBuilder = (ProducerBuilderImpl<T>) this.producerBuilderFactory
.apply(pulsarClient);
ProducerCacheKey cacheKey = new ProducerCacheKey(pulsarClient, producerBuilder.getConf().clone(),
- producerBuilder.getSchema());
+ producerBuilder.getSchema(), this.producerActionTransformerKey);
return Tuples.of(cacheKey, AdapterImplementationFactory.adaptPulsarFuture(producerBuilder::createAsync));
});
}
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapterFactory.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapterFactory.java
index ff33d92..286b84d 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapterFactory.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapterFactory.java
@@ -32,9 +32,11 @@ class ReactiveProducerAdapterFactory {
}
<T> ReactiveProducerAdapter<T> create(Function<PulsarClient, ProducerBuilder<T>> producerBuilderFactory,
- ProducerCache producerCache, Supplier<PublisherTransformer> producerActionTransformer) {
+ ProducerCache producerCache, Supplier<PublisherTransformer> producerActionTransformer,
+ Object producerActionTransformerKey) {
return new ReactiveProducerAdapter<>(this.pulsarClientSupplier, producerBuilderFactory, producerCache,
- (producerActionTransformer != null) ? producerActionTransformer : PublisherTransformer::identity);
+ (producerActionTransformer != null) ? producerActionTransformer : PublisherTransformer::identity,
+ producerActionTransformerKey);
}
}