You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/04/11 13:54:16 UTC

[GitHub] [pulsar] cbornet opened a new pull request, #15122: Add a cache of versioned sub-schemas in KeyValueSchemaImpl

cbornet opened a new pull request, #15122:
URL: https://github.com/apache/pulsar/pull/15122

   <!--
   ### Contribution Checklist
     
     - PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://docs.google.com/document/d/1d8Pw6ZbWk-_pCKdOmdvx9rnhPiyuxwq60_TrD68d7BA/edit#heading=h.trs9rsex3xom)*. 
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   Fixes #15118
   
   ### Modifications
   
   Added ``keySchemaMap` and `valueSchemaMap` to cache results of `atSchemaVersion` in `KeyValueSchemaImpl `
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   
   This change added tests and can be verified as follows:
   * Run `KeyValueSchemaTest::testSchemaCaches`
   
   ### Does this pull request potentially affect one of the following parts:
   
   no
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API: (yes / no)
     - The schema: (yes / no / don't know)
     - The default values of configurations: (yes / no)
     - The wire protocol: (yes / no)
     - The rest endpoints: (yes / no)
     - The admin cli options: (yes / no)
     - Anything that affects deployment: (yes / no / don't know)
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [x] `no-need-doc` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-added`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on a diff in pull request #15122: Add a cache of versioned sub-schemas in KeyValueSchemaImpl

Posted by GitBox <gi...@apache.org>.
cbornet commented on code in PR #15122:
URL: https://github.com/apache/pulsar/pull/15122#discussion_r847392607


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaImpl.java:
##########
@@ -292,14 +298,27 @@ public String toString() {
         if (!supportSchemaVersioning()) {
             return this;
         } else {
-            Schema<?> keySchema = this.keySchema instanceof AbstractSchema ? ((AbstractSchema) this.keySchema)
-                    .atSchemaVersion(schemaVersion) : this.keySchema;
-            Schema<?> valueSchema = this.valueSchema instanceof AbstractSchema ? ((AbstractSchema) this.valueSchema)
-                    .atSchemaVersion(schemaVersion) : this.valueSchema;
+            Schema<?> keySchema = getSchemaAtVersion(this.keySchema, keySchemaMap, schemaVersion);
+            Schema<?> valueSchema = getSchemaAtVersion(this.valueSchema, valueSchemaMap, schemaVersion);
             return KeyValueSchemaImpl.of(keySchema, valueSchema, keyValueEncodingType);
         }
     }
 
+    private Schema<?> getSchemaAtVersion(Schema<?> schema,
+                                         Map<SchemaVersion, Schema<?>> schemaMap,
+                                         byte[] schemaVersion) {
+        if (!(schema instanceof AbstractSchema)) {
+            return schema;
+        }
+        Objects.requireNonNull(schemaVersion);
+        SchemaVersion cacheSchemaVersion = BytesSchemaVersion.of(schemaVersion);
+        schemaMap.computeIfAbsent(

Review Comment:
   Indeed!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on pull request #15122: Add a cache of versioned KeyValueSchemaImpl

Posted by GitBox <gi...@apache.org>.
cbornet commented on PR #15122:
URL: https://github.com/apache/pulsar/pull/15122#issuecomment-1099628644

   > @cbornet - in that case, I don't think this PR should be labeled as fixing https://github.com/apache/pulsar/issues/15118. It seems you've identified two problems, and we should fix both.
   
   Well the intent of #15118 was to describe the issue I'm solving here. Maybe another issue can be opened for the other one
   
   > One question about your caching solution. It seems your solution does not expire cache entries. I am not familiar with schema management, but wouldn't it make sense to use a caffeine cache to prevent unbounded growth on the schemaMap map?
   
   Yes the map is unbounded. I believe there won't be thousands of versions of a schema but I could be wrong.
   There's already an unbounded cache of this type in AutoConsumeSchema
   
   https://github.com/apache/pulsar/blob/193f5b2f74e919c05d0df651981cef439f55472f/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java#L51
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #15122: Add a cache of versioned sub-schemas in KeyValueSchemaImpl

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #15122:
URL: https://github.com/apache/pulsar/pull/15122#discussion_r847375805


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaImpl.java:
##########
@@ -292,14 +298,27 @@ public String toString() {
         if (!supportSchemaVersioning()) {
             return this;
         } else {
-            Schema<?> keySchema = this.keySchema instanceof AbstractSchema ? ((AbstractSchema) this.keySchema)
-                    .atSchemaVersion(schemaVersion) : this.keySchema;
-            Schema<?> valueSchema = this.valueSchema instanceof AbstractSchema ? ((AbstractSchema) this.valueSchema)
-                    .atSchemaVersion(schemaVersion) : this.valueSchema;
+            Schema<?> keySchema = getSchemaAtVersion(this.keySchema, keySchemaMap, schemaVersion);
+            Schema<?> valueSchema = getSchemaAtVersion(this.valueSchema, valueSchemaMap, schemaVersion);
             return KeyValueSchemaImpl.of(keySchema, valueSchema, keyValueEncodingType);
         }
     }
 
+    private Schema<?> getSchemaAtVersion(Schema<?> schema,
+                                         Map<SchemaVersion, Schema<?>> schemaMap,
+                                         byte[] schemaVersion) {
+        if (!(schema instanceof AbstractSchema)) {
+            return schema;
+        }
+        Objects.requireNonNull(schemaVersion);
+        SchemaVersion cacheSchemaVersion = BytesSchemaVersion.of(schemaVersion);
+        schemaMap.computeIfAbsent(

Review Comment:
   what about "return schemaMap.computeIfAbsent(" ...
   this way we save a "get"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on pull request #15122: Add a cache of versioned KeyValueSchemaImpl

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on PR #15122:
URL: https://github.com/apache/pulsar/pull/15122#issuecomment-1096985919

   @cbornet - in that case, I don't think this PR should be labeled as fixing #15118. It seems you've identified two problems, and we should fix both.
   
   One question about your caching solution. It seems your solution does not expire cache entries. I am not familiar with schema management, but wouldn't it make sense to use a caffeine cache to prevent unbounded growth on the `schemaMap` map?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on pull request #15122: Add a cache of versioned KeyValueSchemaImpl

Posted by GitBox <gi...@apache.org>.
cbornet commented on PR #15122:
URL: https://github.com/apache/pulsar/pull/15122#issuecomment-1096295130

   The fix is not to prevent from decoding twice. It is to prevent from decoding each time we call `KeyValueSchemaImpl:: atSchemaVersion` which happens for instance when you call `getReaderSchema`. In some sinks we call `getReaderchema` on each received message so it's done a lot of times.
   There could be optimizations to prevent from decoding twice by optimizing `SchemaInfoProvider` but it's not the purpose of this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #15122: Add a cache of versioned sub-schemas in KeyValueSchemaImpl

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #15122:
URL: https://github.com/apache/pulsar/pull/15122#discussion_r847359502


##########
pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java:
##########
@@ -400,4 +404,18 @@ public void testKeyValueSchemaSeparatedEncoding() {
                 AutoConsumeSchema.getSchema(keyValueSchema.getSchemaInfo());
         assertEquals(keyValueSchema.getKeyValueEncodingType(), keyValueSchema2.getKeyValueEncodingType());
     }
+
+    @Test
+    public void testSchemaCaches() {
+        Schema<Foo> keySchema = spy(Schema.AVRO(Foo.class));
+        Schema<Foo> valueSchema = spy(Schema.AVRO(Foo.class));
+        KeyValueSchemaImpl<Foo, Foo> keyValueSchema = (KeyValueSchemaImpl<Foo,Foo>)
+                KeyValueSchemaImpl.of(keySchema, valueSchema, KeyValueEncodingType.SEPARATED);
+
+        keyValueSchema.atSchemaVersion(new byte[0]);
+        keyValueSchema.atSchemaVersion(new byte[0]);

Review Comment:
   can we assert that we are returning the same object ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on a diff in pull request #15122: Add a cache of versioned sub-schemas in KeyValueSchemaImpl

Posted by GitBox <gi...@apache.org>.
cbornet commented on code in PR #15122:
URL: https://github.com/apache/pulsar/pull/15122#discussion_r847392845


##########
pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java:
##########
@@ -400,4 +404,18 @@ public void testKeyValueSchemaSeparatedEncoding() {
                 AutoConsumeSchema.getSchema(keyValueSchema.getSchemaInfo());
         assertEquals(keyValueSchema.getKeyValueEncodingType(), keyValueSchema2.getKeyValueEncodingType());
     }
+
+    @Test
+    public void testSchemaCaches() {
+        Schema<Foo> keySchema = spy(Schema.AVRO(Foo.class));
+        Schema<Foo> valueSchema = spy(Schema.AVRO(Foo.class));
+        KeyValueSchemaImpl<Foo, Foo> keyValueSchema = (KeyValueSchemaImpl<Foo,Foo>)
+                KeyValueSchemaImpl.of(keySchema, valueSchema, KeyValueEncodingType.SEPARATED);
+
+        keyValueSchema.atSchemaVersion(new byte[0]);
+        keyValueSchema.atSchemaVersion(new byte[0]);

Review Comment:
   Sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on pull request #15122: Add a cache of versioned KeyValueSchemaImpl

Posted by GitBox <gi...@apache.org>.
cbornet commented on PR #15122:
URL: https://github.com/apache/pulsar/pull/15122#issuecomment-1096300288

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari merged pull request #15122: Issue 15118: Add a cache of versioned KeyValueSchemaImpl

Posted by GitBox <gi...@apache.org>.
lhotari merged PR #15122:
URL: https://github.com/apache/pulsar/pull/15122


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org