You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by cb...@apache.org on 2022/12/03 21:39:03 UTC

[pulsar-client-reactive] branch main updated: Add InflightLimiter parameters to producer cache key (#82)

This is an automated email from the ASF dual-hosted git repository.

cbornet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new 2c95893  Add InflightLimiter parameters to producer cache key (#82)
2c95893 is described below

commit 2c95893879c37b7edb1c00be04a7c41ca170238f
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Sat Dec 3 23:38:59 2022 +0200

    Add InflightLimiter parameters to producer cache key (#82)
    
    Fixes #80
---
 .../adapter/AdaptedReactiveMessageSender.java      |  7 ++--
 .../AdaptedReactiveMessageSenderBuilder.java       | 42 ++++++++++++++++++++--
 .../client/internal/adapter/ProducerCacheKey.java  | 11 ++++--
 .../internal/adapter/ReactiveProducerAdapter.java  |  7 ++--
 .../adapter/ReactiveProducerAdapterFactory.java    |  6 ++--
 5 files changed, 62 insertions(+), 11 deletions(-)

diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
index 0471b7c..7ed0062 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
@@ -49,15 +49,18 @@ class AdaptedReactiveMessageSender<T> implements ReactiveMessageSender<T> {
 
 	private final Supplier<PublisherTransformer> producerActionTransformer;
 
+	private final Object producerActionTransformerKey;
+
 	AdaptedReactiveMessageSender(Schema<T> schema, ReactiveMessageSenderSpec senderSpec, int maxConcurrency,
 			ReactiveProducerAdapterFactory reactiveProducerAdapterFactory, ProducerCache producerCache,
-			Supplier<PublisherTransformer> producerActionTransformer) {
+			Supplier<PublisherTransformer> producerActionTransformer, Object producerActionTransformerKey) {
 		this.schema = schema;
 		this.senderSpec = senderSpec;
 		this.maxConcurrency = maxConcurrency;
 		this.reactiveProducerAdapterFactory = reactiveProducerAdapterFactory;
 		this.producerCache = producerCache;
 		this.producerActionTransformer = producerActionTransformer;
+		this.producerActionTransformerKey = producerActionTransformerKey;
 	}
 
 	ReactiveProducerAdapter<T> createReactiveProducerAdapter() {
@@ -65,7 +68,7 @@ class AdaptedReactiveMessageSender<T> implements ReactiveMessageSender<T> {
 			ProducerBuilder<T> producerBuilder = pulsarClient.newProducer(this.schema);
 			configureProducerBuilder(producerBuilder);
 			return producerBuilder;
-		}, this.producerCache, this.producerActionTransformer);
+		}, this.producerCache, this.producerActionTransformer, this.producerActionTransformerKey);
 	}
 
 	private void configureProducerBuilder(ProducerBuilder<T> producerBuilder) {
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java
index 29c41af..18fb53a 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java
@@ -16,6 +16,7 @@
 
 package org.apache.pulsar.reactive.client.internal.adapter;
 
+import java.util.Objects;
 import java.util.function.Supplier;
 
 import org.apache.pulsar.client.api.Schema;
@@ -96,17 +97,54 @@ class AdaptedReactiveMessageSenderBuilder<T> implements ReactiveMessageSenderBui
 
 	@Override
 	public ReactiveMessageSender<T> build() {
+		Object producerActionTransformerKey;
 		if (this.maxInflight > 0) {
 			this.producerActionTransformer = () -> new InflightLimiter(this.maxInflight,
 					Math.max(this.maxInflight / 2, 1), Schedulers.single(), this.maxConcurrentSenderSubscriptions);
+			producerActionTransformerKey = new ProducerActionTransformerKey(this.maxInflight,
+					this.maxConcurrentSenderSubscriptions);
+		}
+		else {
+			producerActionTransformerKey = null;
 		}
 		return new AdaptedReactiveMessageSender<>(this.schema, this.senderSpec, resolveMaxConcurrency(),
-				this.reactiveProducerAdapterFactory, (ProducerCache) this.producerCache,
-				this.producerActionTransformer);
+				this.reactiveProducerAdapterFactory, (ProducerCache) this.producerCache, this.producerActionTransformer,
+				producerActionTransformerKey);
 	}
 
 	private int resolveMaxConcurrency() {
 		return Math.min(Math.max(MAX_CONCURRENCY_LOWER_BOUND, this.maxInflight / 2), MAX_CONCURRENCY_UPPER_BOUND);
 	}
 
+	private static class ProducerActionTransformerKey {
+
+		private final int maxInflight;
+
+		private final int maxConcurrentSenderSubscriptions;
+
+		ProducerActionTransformerKey(int maxInflight, int maxConcurrentSenderSubscriptions) {
+			this.maxInflight = maxInflight;
+			this.maxConcurrentSenderSubscriptions = maxConcurrentSenderSubscriptions;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			ProducerActionTransformerKey that = (ProducerActionTransformerKey) o;
+			return this.maxInflight == that.maxInflight
+					&& this.maxConcurrentSenderSubscriptions == that.maxConcurrentSenderSubscriptions;
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(this.maxInflight, this.maxConcurrentSenderSubscriptions);
+		}
+
+	}
+
 }
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKey.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKey.java
index 5427b72..a36ebbb 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKey.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKey.java
@@ -30,11 +30,14 @@ final class ProducerCacheKey {
 
 	private final Schema<?> schema;
 
+	private final Object producerActionTransformerKey;
+
 	ProducerCacheKey(final PulsarClient pulsarClient, final ProducerConfigurationData producerConfigurationData,
-			final Schema<?> schema) {
+			final Schema<?> schema, Object producerActionTransformerKey) {
 		this.pulsarClient = pulsarClient;
 		this.producerConfigurationData = producerConfigurationData;
 		this.schema = schema;
+		this.producerActionTransformerKey = producerActionTransformerKey;
 	}
 
 	@Override
@@ -48,12 +51,14 @@ final class ProducerCacheKey {
 		ProducerCacheKey that = (ProducerCacheKey) o;
 		return (Objects.equals(this.pulsarClient, that.pulsarClient)
 				&& Objects.equals(this.producerConfigurationData, that.producerConfigurationData)
-				&& Objects.equals(this.schema, that.schema));
+				&& Objects.equals(this.schema, that.schema))
+				&& Objects.equals(this.producerActionTransformerKey, that.producerActionTransformerKey);
 	}
 
 	@Override
 	public int hashCode() {
-		return Objects.hash(this.pulsarClient, this.producerConfigurationData, this.schema);
+		return Objects.hash(this.pulsarClient, this.producerConfigurationData, this.schema,
+				this.producerActionTransformerKey);
 	}
 
 }
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java
index 1ecacf8..a99ff9c 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java
@@ -40,13 +40,16 @@ class ReactiveProducerAdapter<T> {
 
 	private final Supplier<PublisherTransformer> producerActionTransformer;
 
+	private final Object producerActionTransformerKey;
+
 	ReactiveProducerAdapter(Supplier<PulsarClient> pulsarClientSupplier,
 			Function<PulsarClient, ProducerBuilder<T>> producerBuilderFactory, ProducerCache producerCache,
-			Supplier<PublisherTransformer> producerActionTransformer) {
+			Supplier<PublisherTransformer> producerActionTransformer, Object producerActionTransformerKey) {
 		this.pulsarClientSupplier = pulsarClientSupplier;
 		this.producerBuilderFactory = producerBuilderFactory;
 		this.producerCache = producerCache;
 		this.producerActionTransformer = producerActionTransformer;
+		this.producerActionTransformerKey = producerActionTransformerKey;
 	}
 
 	private Mono<Producer<T>> createProducerMono() {
@@ -60,7 +63,7 @@ class ReactiveProducerAdapter<T> {
 			ProducerBuilderImpl<T> producerBuilder = (ProducerBuilderImpl<T>) this.producerBuilderFactory
 					.apply(pulsarClient);
 			ProducerCacheKey cacheKey = new ProducerCacheKey(pulsarClient, producerBuilder.getConf().clone(),
-					producerBuilder.getSchema());
+					producerBuilder.getSchema(), this.producerActionTransformerKey);
 			return Tuples.of(cacheKey, AdapterImplementationFactory.adaptPulsarFuture(producerBuilder::createAsync));
 		});
 	}
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapterFactory.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapterFactory.java
index ff33d92..286b84d 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapterFactory.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapterFactory.java
@@ -32,9 +32,11 @@ class ReactiveProducerAdapterFactory {
 	}
 
 	<T> ReactiveProducerAdapter<T> create(Function<PulsarClient, ProducerBuilder<T>> producerBuilderFactory,
-			ProducerCache producerCache, Supplier<PublisherTransformer> producerActionTransformer) {
+			ProducerCache producerCache, Supplier<PublisherTransformer> producerActionTransformer,
+			Object producerActionTransformerKey) {
 		return new ReactiveProducerAdapter<>(this.pulsarClientSupplier, producerBuilderFactory, producerCache,
-				(producerActionTransformer != null) ? producerActionTransformer : PublisherTransformer::identity);
+				(producerActionTransformer != null) ? producerActionTransformer : PublisherTransformer::identity,
+				producerActionTransformerKey);
 	}
 
 }