You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/04/21 11:26:57 UTC

[pulsar] branch master updated: Add a cache of versioned KeyValueSchemaImpl (#15122)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 93331d376e0 Add a cache of versioned KeyValueSchemaImpl (#15122)
93331d376e0 is described below

commit 93331d376e09a7cd90906b64dbdc99dbf3befc2e
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Thu Apr 21 13:26:50 2022 +0200

    Add a cache of versioned KeyValueSchemaImpl (#15122)
---
 .../client/impl/schema/KeyValueSchemaImpl.java     | 27 +++++++++++++++++-----
 .../client/impl/schema/KeyValueSchemaTest.java     | 22 ++++++++++++++++++
 2 files changed, 43 insertions(+), 6 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaImpl.java
index 486b365a17d..f3b2c234f44 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaImpl.java
@@ -21,7 +21,9 @@ package org.apache.pulsar.client.impl.schema;
 import static com.google.common.base.Preconditions.checkArgument;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Schema;
@@ -48,6 +50,8 @@ public class KeyValueSchemaImpl<K, V> extends AbstractSchema<KeyValue<K, V>> imp
 
     private final KeyValueEncodingType keyValueEncodingType;
 
+    private final Map<SchemaVersion, Schema<?>> schemaMap = new ConcurrentHashMap<>();
+
     // schemaInfo combined by KeySchemaInfo and ValueSchemaInfo:
     //   [keyInfo.length][keyInfo][valueInfo.length][ValueInfo]
     private SchemaInfo schemaInfo;
@@ -292,14 +296,25 @@ public class KeyValueSchemaImpl<K, V> extends AbstractSchema<KeyValue<K, V>> imp
         if (!supportSchemaVersioning()) {
             return this;
         } else {
-            Schema<?> keySchema = this.keySchema instanceof AbstractSchema ? ((AbstractSchema) this.keySchema)
-                    .atSchemaVersion(schemaVersion) : this.keySchema;
-            Schema<?> valueSchema = this.valueSchema instanceof AbstractSchema ? ((AbstractSchema) this.valueSchema)
-                    .atSchemaVersion(schemaVersion) : this.valueSchema;
-            return KeyValueSchemaImpl.of(keySchema, valueSchema, keyValueEncodingType);
+            if (schemaVersion == null) {
+                return internalAtSchemaVersion(null);
+            }
+            return schemaMap.computeIfAbsent(
+                    BytesSchemaVersion.of(schemaVersion),
+                    __ -> internalAtSchemaVersion(schemaVersion));
         }
     }
 
+    private Schema<?> internalAtSchemaVersion(byte[] schemaVersion) {
+        Schema<?> keySchema = this.keySchema instanceof AbstractSchema
+                ? ((AbstractSchema) this.keySchema).atSchemaVersion(schemaVersion)
+                : this.keySchema;
+        Schema<?> valueSchema = this.valueSchema instanceof AbstractSchema
+                ? ((AbstractSchema) this.valueSchema).atSchemaVersion(schemaVersion)
+                : this.valueSchema;
+        return KeyValueSchemaImpl.of(keySchema, valueSchema, keyValueEncodingType);
+    }
+
     /**
      * Get the Schema of the Key.
      * @return the Schema of the Key
@@ -354,7 +369,7 @@ public class KeyValueSchemaImpl<K, V> extends AbstractSchema<KeyValue<K, V>> imp
             throw new SchemaSerializationException("Can't get accurate schema information for " + topicName + " "
                     + "using KeyValueSchemaImpl because SchemaInfoProvider is not set yet");
         } else {
-            SchemaInfo schemaInfo = null;
+            SchemaInfo schemaInfo;
             try {
                 schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion.bytes()).get();
                 if (schemaInfo == null) {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
index bf02c816979..6dfbd8bfb9c 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
@@ -18,7 +18,11 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertSame;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Schema;
@@ -400,4 +404,22 @@ public class KeyValueSchemaTest {
                 AutoConsumeSchema.getSchema(keyValueSchema.getSchemaInfo());
         assertEquals(keyValueSchema.getKeyValueEncodingType(), keyValueSchema2.getKeyValueEncodingType());
     }
+
+    @Test
+    public void testKeyValueSchemaCache() {
+        Schema<Foo> keySchema = spy(Schema.AVRO(Foo.class));
+        Schema<Foo> valueSchema = spy(Schema.AVRO(Foo.class));
+        KeyValueSchemaImpl<Foo, Foo> keyValueSchema = (KeyValueSchemaImpl<Foo,Foo>)
+                KeyValueSchemaImpl.of(keySchema, valueSchema, KeyValueEncodingType.SEPARATED);
+
+        KeyValueSchemaImpl<Foo, Foo> schema1 =
+                (KeyValueSchemaImpl<Foo, Foo>) keyValueSchema.atSchemaVersion(new byte[0]);
+        KeyValueSchemaImpl<Foo, Foo> schema2 =
+                (KeyValueSchemaImpl<Foo, Foo>) keyValueSchema.atSchemaVersion(new byte[0]);
+
+        assertSame(schema1, schema2);
+
+        verify(((AbstractSchema)keySchema), times(1)).atSchemaVersion(new byte[0]);
+        verify(((AbstractSchema)valueSchema), times(1)).atSchemaVersion(new byte[0]);
+    }
 }