You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2023/11/06 07:58:32 UTC

(pulsar-client-reactive) branch main updated: Fix producer cache key for JSON schema (#137)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new fd75610  Fix producer cache key for JSON schema (#137)
fd75610 is described below

commit fd75610797dd8f1808a76d0a7a002da3c2c28e4a
Author: Chris Bono <cb...@vmware.com>
AuthorDate: Mon Nov 6 01:58:27 2023 -0600

    Fix producer cache key for JSON schema (#137)
    
    The key used to cache producers includes the producer schema.
    This works well for primitive schema types and for complex
    schema types when the schema is the exact same instance. However, the
    schema contract does not enforce equals/hashCode. This results in cases
    where 2 producers using the same type of schema are not matching in the
    cache (eg. Schema.JSON(<some-class>)). This commit fixes this by using
    the schema hash for matching rather than the schema itself.
---
 .../client/internal/adapter/ProducerCacheKey.java  |  15 ++-
 .../internal/adapter/ProducerCacheKeyTests.java    | 112 ++++++++++++++++++
 .../CaffeineShadedProducerCacheProviderTest.java   | 126 ++++++++++----------
 .../CaffeineProducerCacheProviderTest.java         | 127 +++++++++++----------
 4 files changed, 259 insertions(+), 121 deletions(-)

diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKey.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKey.java
index 3cda05e..f2571dc 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKey.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKey.java
@@ -24,6 +24,7 @@ import java.util.Objects;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.common.protocol.schema.SchemaHash;
 
 final class ProducerCacheKey {
 
@@ -33,6 +34,8 @@ final class ProducerCacheKey {
 
 	private final Schema<?> schema;
 
+	private final SchemaHash schemaHash;
+
 	private final Object producerActionTransformerKey;
 
 	ProducerCacheKey(final PulsarClient pulsarClient, final ProducerConfigurationData producerConfigurationData,
@@ -40,6 +43,7 @@ final class ProducerCacheKey {
 		this.pulsarClient = pulsarClient;
 		this.producerConfigurationData = producerConfigurationData;
 		this.schema = schema;
+		this.schemaHash = (this.schema != null) ? SchemaHash.of(this.schema) : null;
 		this.producerActionTransformerKey = producerActionTransformerKey;
 	}
 
@@ -54,14 +58,21 @@ final class ProducerCacheKey {
 		ProducerCacheKey that = (ProducerCacheKey) o;
 		return (Objects.equals(this.pulsarClient, that.pulsarClient)
 				&& Objects.equals(this.producerConfigurationData, that.producerConfigurationData)
-				&& Objects.equals(this.schema, that.schema))
+				&& Objects.equals(this.schemaHash, that.schemaHash))
 				&& Objects.equals(this.producerActionTransformerKey, that.producerActionTransformerKey);
 	}
 
 	@Override
 	public int hashCode() {
-		return Objects.hash(this.pulsarClient, this.producerConfigurationData, this.schema,
+		return Objects.hash(this.pulsarClient, this.producerConfigurationData, this.schemaHash,
 				this.producerActionTransformerKey);
 	}
 
+	@Override
+	public String toString() {
+		return "ProducerCacheKey{" + "pulsarClient=" + this.pulsarClient + ", producerConfigurationData="
+				+ this.producerConfigurationData + ", schema=" + this.schema + ", producerActionTransformerKey="
+				+ this.producerActionTransformerKey + "}";
+	}
+
 }
diff --git a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKeyTests.java b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKeyTests.java
new file mode 100644
index 0000000..2cb0c8a
--- /dev/null
+++ b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheKeyTests.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.reactive.client.internal.adapter;
+
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link ProducerCacheKey}.
+ */
+class ProducerCacheKeyTests {
+
+	@Test
+	void matchShouldTakeIntoAccountPulsarClientField() {
+		PulsarClient client = mock(PulsarClient.class);
+		ProducerConfigurationData data = new ProducerConfigurationData();
+		ProducerCacheKey cacheKey = new ProducerCacheKey(client, data, Schema.STRING, null);
+		assertThat(cacheKey).isEqualTo(new ProducerCacheKey(client, data, Schema.STRING, null));
+		assertThat(cacheKey).isNotEqualTo(new ProducerCacheKey(mock(PulsarClient.class), data, Schema.STRING, null));
+	}
+
+	@Test
+	void matchShouldTakeIntoAccountProducerConfigDataField() {
+		PulsarClient client = mock(PulsarClient.class);
+		ProducerConfigurationData data1 = new ProducerConfigurationData();
+		data1.setTopicName("foo");
+		ProducerConfigurationData data2 = new ProducerConfigurationData();
+		data2.setTopicName("foo");
+		ProducerCacheKey cacheKey = new ProducerCacheKey(client, data1, Schema.STRING, null);
+		assertThat(cacheKey).isEqualTo(new ProducerCacheKey(client, data1, Schema.STRING, null));
+		assertThat(cacheKey).isEqualTo(new ProducerCacheKey(client, data2, Schema.STRING, null));
+		data2.setTopicName("bar");
+		assertThat(cacheKey).isNotEqualTo(new ProducerCacheKey(client, data2, Schema.STRING, null));
+	}
+
+	@Test
+	void matchShouldTakeIntoAccountSchemaField() {
+		PulsarClient client = mock(PulsarClient.class);
+		ProducerConfigurationData data = new ProducerConfigurationData();
+		assertThat(new ProducerCacheKey(client, data, Schema.STRING, null))
+				.isEqualTo(new ProducerCacheKey(client, data, Schema.STRING, null));
+		assertThat(new ProducerCacheKey(client, data, Schema.JSON(Foo.class), null))
+				.isEqualTo(new ProducerCacheKey(client, data, Schema.JSON(Foo.class), null));
+		assertThat(new ProducerCacheKey(client, data, Schema.JSON(Foo.class), null))
+				.isNotEqualTo(new ProducerCacheKey(client, data, Schema.JSON(Bar.class), null));
+		assertThat(new ProducerCacheKey(client, data, Schema.STRING, null))
+				.isNotEqualTo(new ProducerCacheKey(client, data, Schema.JSON(Foo.class), null));
+	}
+
+	@Test
+	void matchShouldTakeIntoAccountTransformerKeyField() {
+		PulsarClient client = mock(PulsarClient.class);
+		ProducerConfigurationData data = new ProducerConfigurationData();
+		assertThat(new ProducerCacheKey(client, data, Schema.STRING, null))
+				.isEqualTo(new ProducerCacheKey(client, data, Schema.STRING, null));
+		assertThat(new ProducerCacheKey(client, data, Schema.STRING, "a"))
+				.isEqualTo(new ProducerCacheKey(client, data, Schema.STRING, "a"));
+		assertThat(new ProducerCacheKey(client, data, Schema.STRING, "a"))
+				.isNotEqualTo(new ProducerCacheKey(client, data, Schema.STRING, "b"));
+	}
+
+	static class Foo {
+
+		private String id;
+
+		Foo(String id) {
+			this.id = id;
+		}
+
+		String getId() {
+			return this.id;
+		}
+
+	}
+
+	static class Bar {
+
+		private String id;
+
+		Bar(String id) {
+			this.id = id;
+		}
+
+		String getId() {
+			return this.id;
+		}
+
+	}
+
+}
diff --git a/pulsar-client-reactive-producer-cache-caffeine-shaded/src/test/java/org/apache/pulsar/reactive/client/producercache/CaffeineShadedProducerCacheProviderTest.java b/pulsar-client-reactive-producer-cache-caffeine-shaded/src/test/java/org/apache/pulsar/reactive/client/producercache/CaffeineShadedProducerCacheProviderTest.java
index 8f7518b..79f6b08 100644
--- a/pulsar-client-reactive-producer-cache-caffeine-shaded/src/test/java/org/apache/pulsar/reactive/client/producercache/CaffeineShadedProducerCacheProviderTest.java
+++ b/pulsar-client-reactive-producer-cache-caffeine-shaded/src/test/java/org/apache/pulsar/reactive/client/producercache/CaffeineShadedProducerCacheProviderTest.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.ProducerBase;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory;
 import org.apache.pulsar.reactive.client.api.MessageSpec;
 import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
@@ -57,42 +58,17 @@ class CaffeineShadedProducerCacheProviderTest {
 	void cacheProvider(String name, CaffeineShadedProducerCacheProvider cacheProvider) throws Exception {
 		PulsarClientImpl pulsarClient = spy(
 				(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
-
-		ProducerBase<String> producer = mock(ProducerBase.class);
-		doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
-		doReturn(CompletableFuture.completedFuture(null)).when(producer).flushAsync();
-		doReturn(true).when(producer).isConnected();
-		TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
-				new TypedMessageBuilderImpl<>(producer, Schema.STRING));
-		doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder).sendAsync();
-
-		doReturn(typedMessageBuilder).when(producer).newMessage();
-
-		doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient).createProducerAsync(any(),
-				eq(Schema.STRING), isNull());
-
-		ProducerBase<Integer> producer2 = mock(ProducerBase.class);
-		doReturn(CompletableFuture.completedFuture(null)).when(producer2).closeAsync();
-		doReturn(CompletableFuture.completedFuture(null)).when(producer2).flushAsync();
-		doReturn(true).when(producer2).isConnected();
-		TypedMessageBuilderImpl<Integer> typedMessageBuilder2 = spy(
-				new TypedMessageBuilderImpl<>(producer2, Schema.INT32));
-		doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder2).sendAsync();
-
-		doReturn(typedMessageBuilder2).when(producer2).newMessage();
-
-		doReturn(CompletableFuture.completedFuture(producer2)).when(pulsarClient).createProducerAsync(any(),
-				eq(Schema.INT32), isNull());
-
+		setupMockProducerForSchema(Schema.STRING, pulsarClient);
+		setupMockProducerForSchema(Schema.INT32, pulsarClient);
 		ReactiveMessageSenderCache cache = AdaptedReactivePulsarClientFactory.createCache(cacheProvider);
 
+		// Send N string messages (should only create producer for string messages once)
 		ReactiveMessageSender<String> sender = AdaptedReactivePulsarClientFactory.create(pulsarClient)
-				.messageSender(Schema.STRING).topic("my-topic").cache(cache).build();
-
+				.messageSender(Schema.STRING).topic("my-topic-str").cache(cache).build();
 		sender.sendOne(MessageSpec.of("a")).then(sender.sendOne(MessageSpec.of("b")))
 				.thenMany(Flux.just(MessageSpec.of("c")).as(sender::sendMany)).blockLast(Duration.ofSeconds(5));
 
-		verify(pulsarClient).createProducerAsync(any(), any(), isNull());
+		verify(pulsarClient, times(1)).createProducerAsync(any(), eq(Schema.STRING), isNull());
 	}
 
 	private static Stream<Arguments> cacheProvider() {
@@ -102,6 +78,36 @@ class CaffeineShadedProducerCacheProviderTest {
 				.stream();
 	}
 
+	@Test
+	void complexTypesAreCachedProperly() throws Exception {
+		// Because !Schema.JSON(Foo.class).equals(Schema.JSON(Foo.class))..
+		// Ensure that two separate senders using the same JSON schema type are cached
+		// properly
+		PulsarClientImpl pulsarClient = spy(
+				(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
+		setupMockProducerForSchema(Schema.JSON(TestMessage.class), pulsarClient);
+		ReactiveMessageSenderCache cache = AdaptedReactivePulsarClientFactory
+				.createCache(new CaffeineShadedProducerCacheProvider());
+
+		// Send N JSON messages across 2 senders w/ same schema type (should only create 1
+		// producer)
+		ReactiveMessageSender<TestMessage> jsonSender = AdaptedReactivePulsarClientFactory.create(pulsarClient)
+				.messageSender(Schema.JSON(TestMessage.class)).topic("my-topic-json").cache(cache).build();
+		ReactiveMessageSender<TestMessage> jsonSender2 = AdaptedReactivePulsarClientFactory.create(pulsarClient)
+				.messageSender(Schema.JSON(TestMessage.class)).topic("my-topic-json").cache(cache).build();
+
+		jsonSender.sendOne(MessageSpec.of(new TestMessage("a")))
+				.then(jsonSender.sendOne(MessageSpec.of(new TestMessage("b"))))
+				.thenMany(Flux.just(MessageSpec.of(new TestMessage("c"))).as(jsonSender::sendMany))
+				.blockLast(Duration.ofSeconds(5));
+		jsonSender2.sendOne(MessageSpec.of(new TestMessage("a")))
+				.then(jsonSender.sendOne(MessageSpec.of(new TestMessage("b"))))
+				.thenMany(Flux.just(MessageSpec.of(new TestMessage("c"))).as(jsonSender::sendMany))
+				.blockLast(Duration.ofSeconds(5));
+
+		verify(pulsarClient, times(1)).createProducerAsync(any(), any(JSONSchema.class), isNull());
+	}
+
 	@Test
 	void loadedByServiceLoader() {
 		ReactiveMessageSenderCache cache = AdaptedReactivePulsarClientFactory.createCache();
@@ -112,48 +118,48 @@ class CaffeineShadedProducerCacheProviderTest {
 	void caffeinePropsAreRespected() throws Exception {
 		PulsarClientImpl pulsarClient = spy(
 				(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
-
-		ProducerBase<String> producer = mock(ProducerBase.class);
-		doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
-		doReturn(CompletableFuture.completedFuture(null)).when(producer).flushAsync();
-		doReturn(true).when(producer).isConnected();
-		TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
-				new TypedMessageBuilderImpl<>(producer, Schema.STRING));
-		doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder).sendAsync();
-
-		doReturn(typedMessageBuilder).when(producer).newMessage();
-
-		doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient).createProducerAsync(any(),
-				eq(Schema.STRING), isNull());
-
-		ProducerBase<Integer> producer2 = mock(ProducerBase.class);
-		doReturn(CompletableFuture.completedFuture(null)).when(producer2).closeAsync();
-		doReturn(CompletableFuture.completedFuture(null)).when(producer2).flushAsync();
-		doReturn(true).when(producer2).isConnected();
-		TypedMessageBuilderImpl<Integer> typedMessageBuilder2 = spy(
-				new TypedMessageBuilderImpl<>(producer2, Schema.INT32));
-		doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder2).sendAsync();
-
-		doReturn(typedMessageBuilder2).when(producer2).newMessage();
-
-		doReturn(CompletableFuture.completedFuture(producer2)).when(pulsarClient).createProducerAsync(any(),
-				eq(Schema.INT32), isNull());
+		setupMockProducerForSchema(Schema.STRING, pulsarClient);
+		setupMockProducerForSchema(Schema.INT32, pulsarClient);
 
 		CaffeineShadedProducerCacheProvider cacheProvider = new CaffeineShadedProducerCacheProvider(
 				Duration.ofMinutes(1), Duration.ofMillis(100), 100L, 50);
 		ReactiveMessageSenderCache cache = AdaptedReactivePulsarClientFactory.createCache(cacheProvider);
-
 		ReactiveMessageSender<String> sender = AdaptedReactivePulsarClientFactory.create(pulsarClient)
 				.messageSender(Schema.STRING).topic("my-topic").cache(cache).build();
 
 		sender.sendOne(MessageSpec.of("a")).then(sender.sendOne(MessageSpec.of("b")))
 				.thenMany(Flux.just(MessageSpec.of("c")).as(sender::sendMany)).blockLast(Duration.ofSeconds(5));
-
 		Thread.sleep(101);
-
 		sender.sendOne(MessageSpec.of("d")).block(Duration.ofSeconds(5));
-
 		verify(pulsarClient, times(2)).createProducerAsync(any(), any(), isNull());
 	}
 
+	private <T> void setupMockProducerForSchema(Schema<T> schema, PulsarClientImpl pulsarClient) {
+		ProducerBase<T> producer = mock(ProducerBase.class);
+		doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
+		doReturn(CompletableFuture.completedFuture(null)).when(producer).flushAsync();
+		doReturn(true).when(producer).isConnected();
+		TypedMessageBuilderImpl<T> typedMessageBuilder = spy(new TypedMessageBuilderImpl<>(producer, schema));
+		doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder).sendAsync();
+		doReturn(typedMessageBuilder).when(producer).newMessage();
+		doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient).createProducerAsync(any(),
+				any(schema.getClass()), isNull());
+	}
+
+	static class TestMessage {
+
+		private final String id;
+
+		TestMessage(String id) {
+			this.id = id;
+		}
+
+		// CHECKSTYLE:OFF
+		public String getId() {
+			return this.id;
+		}
+		// CHECKSTYLE:ON
+
+	}
+
 }
diff --git a/pulsar-client-reactive-producer-cache-caffeine/src/test/java/org/apache/pulsar/reactive/client/producercache/CaffeineProducerCacheProviderTest.java b/pulsar-client-reactive-producer-cache-caffeine/src/test/java/org/apache/pulsar/reactive/client/producercache/CaffeineProducerCacheProviderTest.java
index eb8bbbb..668387d 100644
--- a/pulsar-client-reactive-producer-cache-caffeine/src/test/java/org/apache/pulsar/reactive/client/producercache/CaffeineProducerCacheProviderTest.java
+++ b/pulsar-client-reactive-producer-cache-caffeine/src/test/java/org/apache/pulsar/reactive/client/producercache/CaffeineProducerCacheProviderTest.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.ProducerBase;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory;
 import org.apache.pulsar.reactive.client.api.MessageSpec;
 import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
@@ -59,42 +60,17 @@ class CaffeineProducerCacheProviderTest {
 	void cacheProvider(String name, CaffeineProducerCacheProvider cacheProvider) throws Exception {
 		PulsarClientImpl pulsarClient = spy(
 				(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
-
-		ProducerBase<String> producer = mock(ProducerBase.class);
-		doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
-		doReturn(CompletableFuture.completedFuture(null)).when(producer).flushAsync();
-		doReturn(true).when(producer).isConnected();
-		TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
-				new TypedMessageBuilderImpl<>(producer, Schema.STRING));
-		doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder).sendAsync();
-
-		doReturn(typedMessageBuilder).when(producer).newMessage();
-
-		doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient).createProducerAsync(any(),
-				eq(Schema.STRING), isNull());
-
-		ProducerBase<Integer> producer2 = mock(ProducerBase.class);
-		doReturn(CompletableFuture.completedFuture(null)).when(producer2).closeAsync();
-		doReturn(CompletableFuture.completedFuture(null)).when(producer2).flushAsync();
-		doReturn(true).when(producer2).isConnected();
-		TypedMessageBuilderImpl<Integer> typedMessageBuilder2 = spy(
-				new TypedMessageBuilderImpl<>(producer2, Schema.INT32));
-		doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder2).sendAsync();
-
-		doReturn(typedMessageBuilder2).when(producer2).newMessage();
-
-		doReturn(CompletableFuture.completedFuture(producer2)).when(pulsarClient).createProducerAsync(any(),
-				eq(Schema.INT32), isNull());
-
+		setupMockProducerForSchema(Schema.STRING, pulsarClient);
+		setupMockProducerForSchema(Schema.INT32, pulsarClient);
 		ReactiveMessageSenderCache cache = AdaptedReactivePulsarClientFactory.createCache(cacheProvider);
 
+		// Send N string messages (should only create producer for string messages once)
 		ReactiveMessageSender<String> sender = AdaptedReactivePulsarClientFactory.create(pulsarClient)
-				.messageSender(Schema.STRING).topic("my-topic").cache(cache).build();
-
+				.messageSender(Schema.STRING).topic("my-topic-str").cache(cache).build();
 		sender.sendOne(MessageSpec.of("a")).then(sender.sendOne(MessageSpec.of("b")))
 				.thenMany(Flux.just(MessageSpec.of("c")).as(sender::sendMany)).blockLast(Duration.ofSeconds(5));
 
-		verify(pulsarClient).createProducerAsync(any(), any(), isNull());
+		verify(pulsarClient, times(1)).createProducerAsync(any(), eq(Schema.STRING), isNull());
 	}
 
 	private static Stream<Arguments> cacheProvider() {
@@ -109,6 +85,38 @@ class CaffeineProducerCacheProviderTest {
 				.stream();
 	}
 
+	@Test
+	void complexTypesAreCachedProperly() throws Exception {
+		// Because !Schema.JSON(Foo.class).equals(Schema.JSON(Foo.class))..
+		// Ensure that two separate senders using the same JSON schema type are cached
+		// properly
+		PulsarClientImpl pulsarClient = spy(
+				(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
+		setupMockProducerForSchema(Schema.JSON(TestMessage.class), pulsarClient);
+		ReactiveMessageSenderCache cache = AdaptedReactivePulsarClientFactory
+				.createCache(new CaffeineProducerCacheProvider());
+
+		// Send N JSON messages across 2 senders w/ same schema type (should only create 1
+		// producer)
+		ReactiveMessageSender<TestMessage> jsonSender = AdaptedReactivePulsarClientFactory.create(pulsarClient)
+				.messageSender(Schema.JSON(TestMessage.class)).topic("my-topic-json").cache(cache).build();
+
+		ReactiveMessageSender<TestMessage> jsonSender2 = AdaptedReactivePulsarClientFactory.create(pulsarClient)
+				.messageSender(Schema.JSON(TestMessage.class)).topic("my-topic-json").cache(cache).build();
+
+		jsonSender.sendOne(MessageSpec.of(new TestMessage("a")))
+				.then(jsonSender.sendOne(MessageSpec.of(new TestMessage("b"))))
+				.thenMany(Flux.just(MessageSpec.of(new TestMessage("c"))).as(jsonSender::sendMany))
+				.blockLast(Duration.ofSeconds(5));
+
+		jsonSender2.sendOne(MessageSpec.of(new TestMessage("a")))
+				.then(jsonSender.sendOne(MessageSpec.of(new TestMessage("b"))))
+				.thenMany(Flux.just(MessageSpec.of(new TestMessage("c"))).as(jsonSender::sendMany))
+				.blockLast(Duration.ofSeconds(5));
+
+		verify(pulsarClient, times(1)).createProducerAsync(any(), any(JSONSchema.class), isNull());
+	}
+
 	@Test
 	void loadedByServiceLoader() {
 		ReactiveMessageSenderCache cache = AdaptedReactivePulsarClientFactory.createCache();
@@ -119,48 +127,49 @@ class CaffeineProducerCacheProviderTest {
 	void caffeinePropsAreRespected() throws Exception {
 		PulsarClientImpl pulsarClient = spy(
 				(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
-
-		ProducerBase<String> producer = mock(ProducerBase.class);
-		doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
-		doReturn(CompletableFuture.completedFuture(null)).when(producer).flushAsync();
-		doReturn(true).when(producer).isConnected();
-		TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
-				new TypedMessageBuilderImpl<>(producer, Schema.STRING));
-		doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder).sendAsync();
-
-		doReturn(typedMessageBuilder).when(producer).newMessage();
-
-		doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient).createProducerAsync(any(),
-				eq(Schema.STRING), isNull());
-
-		ProducerBase<Integer> producer2 = mock(ProducerBase.class);
-		doReturn(CompletableFuture.completedFuture(null)).when(producer2).closeAsync();
-		doReturn(CompletableFuture.completedFuture(null)).when(producer2).flushAsync();
-		doReturn(true).when(producer2).isConnected();
-		TypedMessageBuilderImpl<Integer> typedMessageBuilder2 = spy(
-				new TypedMessageBuilderImpl<>(producer2, Schema.INT32));
-		doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder2).sendAsync();
-
-		doReturn(typedMessageBuilder2).when(producer2).newMessage();
-
-		doReturn(CompletableFuture.completedFuture(producer2)).when(pulsarClient).createProducerAsync(any(),
-				eq(Schema.INT32), isNull());
+		setupMockProducerForSchema(Schema.STRING, pulsarClient);
+		setupMockProducerForSchema(Schema.INT32, pulsarClient);
 
 		CaffeineProducerCacheProvider cacheProvider = new CaffeineProducerCacheProvider(
 				Caffeine.newBuilder().expireAfterWrite(Duration.ofMillis(100)).maximumSize(100));
 		ReactiveMessageSenderCache cache = AdaptedReactivePulsarClientFactory.createCache(cacheProvider);
-
 		ReactiveMessageSender<String> sender = AdaptedReactivePulsarClientFactory.create(pulsarClient)
 				.messageSender(Schema.STRING).topic("my-topic").cache(cache).build();
 
 		sender.sendOne(MessageSpec.of("a")).then(sender.sendOne(MessageSpec.of("b")))
 				.thenMany(Flux.just(MessageSpec.of("c")).as(sender::sendMany)).blockLast(Duration.ofSeconds(5));
-
 		Thread.sleep(101);
-
 		sender.sendOne(MessageSpec.of("d")).block(Duration.ofSeconds(5));
 
 		verify(pulsarClient, times(2)).createProducerAsync(any(), any(), isNull());
 	}
 
+	private <T> void setupMockProducerForSchema(Schema<T> schema, PulsarClientImpl pulsarClient) {
+		ProducerBase<T> producer = mock(ProducerBase.class);
+		doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
+		doReturn(CompletableFuture.completedFuture(null)).when(producer).flushAsync();
+		doReturn(true).when(producer).isConnected();
+		TypedMessageBuilderImpl<T> typedMessageBuilder = spy(new TypedMessageBuilderImpl<>(producer, schema));
+		doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder).sendAsync();
+		doReturn(typedMessageBuilder).when(producer).newMessage();
+		doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient).createProducerAsync(any(),
+				any(schema.getClass()), isNull());
+	}
+
+	static class TestMessage {
+
+		private final String id;
+
+		TestMessage(String id) {
+			this.id = id;
+		}
+
+		// CHECKSTYLE:OFF
+		public String getId() {
+			return this.id;
+		}
+		// CHECKSTYLE:ON
+
+	}
+
 }