You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/08/16 02:46:48 UTC

[pulsar] branch master updated: Fix StructSchema reader cache loading logic

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0367f5f  Fix StructSchema reader cache loading logic
0367f5f is described below

commit 0367f5f0a236b26cfec74b272ca44a4c77eceda1
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Fri Aug 16 10:46:42 2019 +0800

    Fix StructSchema reader cache loading logic
    
    ### Motivation
    StructSchema LoadingCache for cache reader, but key is byte[], it will compare with the address's  hashcode. So every decode will generate a new reader so we should change the type of the key for LoadingCache
---
 .../java/org/apache/pulsar/client/impl/schema/AvroSchema.java |  9 +++++----
 .../java/org/apache/pulsar/client/impl/schema/JSONSchema.java |  3 ++-
 .../org/apache/pulsar/client/impl/schema/ProtobufSchema.java  |  3 ++-
 .../org/apache/pulsar/client/impl/schema/StructSchema.java    | 11 ++++++-----
 .../pulsar/client/impl/schema/generic/GenericAvroSchema.java  | 11 ++++++-----
 .../pulsar/client/impl/schema/generic/GenericJsonSchema.java  | 11 ++++++-----
 6 files changed, 27 insertions(+), 21 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index 4f4ae5a..9eddee5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.api.schema.SchemaReader;
 import org.apache.pulsar.client.impl.schema.reader.AvroReader;
 import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.slf4j.Logger;
@@ -99,16 +100,16 @@ public class AvroSchema<T> extends StructSchema<T> {
     }
 
     @Override
-    protected SchemaReader<T> loadReader(byte[] schemaVersion) {
-        SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion);
+    protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
+        SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get());
         if (schemaInfo != null) {
             log.info("Load schema reader for version({}), schema is : {}",
-                SchemaUtils.getStringSchemaVersion(schemaVersion),
+                SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
                 schemaInfo.getSchemaDefinition());
             return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()), schema);
         } else {
             log.warn("No schema found for version({}), use latest schema : {}",
-                SchemaUtils.getStringSchemaVersion(schemaVersion),
+                SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
                 this.schemaInfo.getSchemaDefinition());
             return reader;
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
index 04b249c..90a288e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.api.schema.SchemaReader;
 import org.apache.pulsar.client.impl.schema.reader.JsonReader;
 import org.apache.pulsar.client.impl.schema.writer.JsonWriter;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
@@ -58,7 +59,7 @@ public class JSONSchema<T> extends StructSchema<T> {
     }
 
     @Override
-    protected SchemaReader<T> loadReader(byte[] schemaVersion) {
+    protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
         throw new RuntimeException("JSONSchema don't support schema versioning");
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
index b40f2ef..23dde95 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.api.schema.SchemaReader;
 import org.apache.pulsar.client.impl.schema.reader.ProtobufReader;
 import org.apache.pulsar.client.impl.schema.writer.ProtobufWriter;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
@@ -94,7 +95,7 @@ public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> ex
     }
 
     @Override
-    protected SchemaReader<T> loadReader(byte[] schemaVersion) {
+    protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
         throw new RuntimeException("ProtobufSchema don't support schema versioning");
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
index 4ec9fc8..a6608f4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
 import org.apache.pulsar.client.api.schema.SchemaReader;
 import org.apache.pulsar.client.api.schema.SchemaWriter;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.slf4j.Logger;
@@ -60,10 +61,10 @@ public abstract class StructSchema<T> implements Schema<T> {
     protected SchemaReader<T> reader;
     protected SchemaWriter<T> writer;
     protected SchemaInfoProvider schemaInfoProvider;
-    private final LoadingCache<byte[], SchemaReader<T>> readerCache = CacheBuilder.newBuilder().maximumSize(100000)
-            .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<byte[], SchemaReader<T>>() {
+    private final LoadingCache<BytesSchemaVersion, SchemaReader<T>> readerCache = CacheBuilder.newBuilder().maximumSize(100000)
+            .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<BytesSchemaVersion, SchemaReader<T>>() {
                 @Override
-                public SchemaReader<T> load(byte[] schemaVersion) {
+                public SchemaReader<T> load(BytesSchemaVersion schemaVersion) {
                     return loadReader(schemaVersion);
                 }
             });
@@ -90,7 +91,7 @@ public abstract class StructSchema<T> implements Schema<T> {
     @Override
     public T decode(byte[] bytes, byte[] schemaVersion) {
         try {
-            return readerCache.get(schemaVersion).read(bytes);
+            return readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(bytes);
         } catch (ExecutionException e) {
             LOG.error("Can't get generic schema for topic {} schema version {}",
                     schemaInfoProvider.getTopicName(), Hex.encodeHexString(schemaVersion), e);
@@ -138,7 +139,7 @@ public abstract class StructSchema<T> implements Schema<T> {
      * @param schemaVersion the provided schema version
      * @return the schema reader for decoding messages encoded by the provided schema version.
      */
-    protected abstract SchemaReader<T> loadReader(byte[] schemaVersion);
+    protected abstract SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion);
 
     /**
      * TODO: think about how to make this async
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
index fab70d4..91220b4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
@@ -24,6 +24,7 @@ import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
 import org.apache.pulsar.client.api.schema.SchemaReader;
 import org.apache.pulsar.client.impl.schema.SchemaUtils;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
 /**
@@ -54,21 +55,21 @@ public class GenericAvroSchema extends GenericSchemaImpl {
     }
 
     @Override
-    protected SchemaReader<GenericRecord> loadReader(byte[] schemaVersion) {
-         SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion);
+    protected SchemaReader<GenericRecord> loadReader(BytesSchemaVersion schemaVersion) {
+         SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get());
          if (schemaInfo != null) {
              log.info("Load schema reader for version({}), schema is : {}",
-                 SchemaUtils.getStringSchemaVersion(schemaVersion),
+                 SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
                  schemaInfo);
              Schema writerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition());
              Schema readerSchema = useProvidedSchemaAsReaderSchema ? schema : writerSchema;
              return new GenericAvroReader(
                      writerSchema,
                      readerSchema,
-                     schemaVersion);
+                     schemaVersion.get());
          } else {
              log.warn("No schema found for version({}), use latest schema : {}",
-                 SchemaUtils.getStringSchemaVersion(schemaVersion),
+                 SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
                  this.schemaInfo);
              return reader;
          }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
index 307edd5..7bde670 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
 import org.apache.pulsar.client.api.schema.SchemaReader;
 import org.apache.pulsar.client.impl.schema.SchemaUtils;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
 /**
@@ -47,11 +48,11 @@ class GenericJsonSchema extends GenericSchemaImpl {
     }
 
     @Override
-    protected SchemaReader<GenericRecord> loadReader(byte[] schemaVersion) {
-        SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion);
+    protected SchemaReader<GenericRecord> loadReader(BytesSchemaVersion schemaVersion) {
+        SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get());
         if (schemaInfo != null) {
             log.info("Load schema reader for version({}), schema is : {}",
-                SchemaUtils.getStringSchemaVersion(schemaVersion),
+                SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
                 schemaInfo.getSchemaDefinition());
             Schema readerSchema;
             if (useProvidedSchemaAsReaderSchema) {
@@ -59,14 +60,14 @@ class GenericJsonSchema extends GenericSchemaImpl {
             } else {
                 readerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition());
             }
-            return new GenericJsonReader(schemaVersion,
+            return new GenericJsonReader(schemaVersion.get(),
                     readerSchema.getFields()
                             .stream()
                             .map(f -> new Field(f.name(), f.pos()))
                             .collect(Collectors.toList()));
         } else {
             log.warn("No schema found for version({}), use latest schema : {}",
-                SchemaUtils.getStringSchemaVersion(schemaVersion),
+                SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
                 this.schemaInfo.getSchemaDefinition());
             return reader;
         }