You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/18 13:36:00 UTC

[pulsar] branch master updated: [Schema] Multi version schema reader implementation. (#8464)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 72d6d28  [Schema] Multi version schema reader implementation. (#8464)
72d6d28 is described below

commit 72d6d28a7ca7a89f24e7397cefcd8a2a67a4252e
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Wed Nov 18 21:35:35 2020 +0800

    [Schema] Multi version schema reader implementation. (#8464)
    
    This PR in order to change decode multi version message by reader cache in schema to  multi version reader and user can custom reader to decode multi version message.
---
 .../pulsar/client/api/schema/SchemaReader.java     | 30 ++++++++
 .../client/impl/schema/AbstractStructSchema.java   | 73 ++-----------------
 .../client/impl/schema/AvroBaseStructSchema.java   | 49 +++++++++++++
 .../pulsar/client/impl/schema/AvroSchema.java      | 47 +++++-------
 .../pulsar/client/impl/schema/JSONSchema.java      | 11 ++-
 .../pulsar/client/impl/schema/ProtobufSchema.java  | 11 +--
 .../pulsar/client/impl/schema/StructSchema.java    |  2 +-
 .../generic/AbstractMultiVersionGenericReader.java | 43 +++++++++++
 .../impl/schema/generic/GenericAvroSchema.java     | 40 +++--------
 .../impl/schema/generic/GenericJsonSchema.java     | 38 +---------
 .../impl/schema/generic/GenericSchemaImpl.java     | 18 ++---
 .../generic/MultiVersionGenericAvroReader.java     | 65 +++++++++++++++++
 ...ema.java => MultiVersionGenericJsonReader.java} | 49 ++++++-------
 .../reader/AbstractMultiVersionAvroBaseReader.java | 35 +++++++++
 .../AbstractMultiVersionReader.java}               | 83 ++++++++--------------
 .../impl/schema/reader/MultiVersionAvroReader.java | 64 +++++++++++++++++
 .../{StructSchema.java => util/SchemaUtil.java}    | 62 ++++++----------
 .../pulsar/client/impl/schema/AvroSchemaTest.java  | 27 +++++++
 .../pulsar/io/influxdb/v2/InfluxDBSinkTest.java    |  4 +-
 .../pulsar/sql/presto/TestAvroSchemaHandler.java   |  6 +-
 20 files changed, 437 insertions(+), 320 deletions(-)

diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java
index d4279dc..dc18c4e 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java
@@ -56,4 +56,34 @@ public interface SchemaReader<T> {
      * @return the serialized object
      */
     T read(InputStream inputStream);
+
+    /**
+     * Serialize bytes convert pojo.
+     *
+     * @param bytes the data
+     * @param schemaVersion the schema version of message
+     * @return the serialized object
+     */
+    default T read(byte[] bytes, byte[] schemaVersion) {
+        return read(bytes, 0, bytes.length);
+    }
+
+    /**
+     * serialize bytes convert pojo.
+     *
+     * @param inputStream the stream of message
+     * @param schemaVersion the schema version of message
+     * @return the serialized object
+     */
+    default T read(InputStream inputStream, byte[] schemaVersion) {
+        return read(inputStream);
+    }
+
+    /**
+     * Set schema info provider, this method support multi version reader.
+     *
+     * @param schemaInfoProvider the stream of message
+     */
+    default void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
index 7730474..015ec4d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
@@ -18,26 +18,16 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
-import org.apache.avro.AvroTypeException;
-import org.apache.commons.codec.binary.Hex;
-import org.apache.commons.lang3.SerializationException;
-import org.apache.pulsar.client.api.SchemaSerializationException;
+
 import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
 import org.apache.pulsar.client.api.schema.SchemaReader;
 import org.apache.pulsar.client.api.schema.SchemaWriter;
-import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
 /**
  * minimal abstract StructSchema
  */
@@ -45,19 +35,11 @@ public abstract class AbstractStructSchema<T> extends AbstractSchema<T> {
 
     protected static final Logger LOG = LoggerFactory.getLogger(AbstractStructSchema.class);
 
-    protected SchemaInfo schemaInfo;
+    protected final SchemaInfo schemaInfo;
     protected SchemaReader<T> reader;
     protected SchemaWriter<T> writer;
     protected SchemaInfoProvider schemaInfoProvider;
 
-    LoadingCache<BytesSchemaVersion, SchemaReader<T>> readerCache = CacheBuilder.newBuilder().maximumSize(100000)
-            .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<BytesSchemaVersion, SchemaReader<T>>() {
-                @Override
-                public SchemaReader<T> load(BytesSchemaVersion schemaVersion) {
-                    return loadReader(schemaVersion);
-                }
-            });
-
     public AbstractStructSchema(SchemaInfo schemaInfo){
         this.schemaInfo = schemaInfo;
     }
@@ -75,17 +57,7 @@ public abstract class AbstractStructSchema<T> extends AbstractSchema<T> {
 
     @Override
     public T decode(byte[] bytes, byte[] schemaVersion) {
-        try {
-            return schemaVersion == null ? decode(bytes) :
-                    readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(bytes);
-        } catch (ExecutionException | AvroTypeException e) {
-            if (e instanceof AvroTypeException) {
-                throw new SchemaSerializationException(e);
-            }
-            LOG.error("Can't get generic schema for topic {} schema version {}",
-                    schemaInfoProvider.getTopicName(), Hex.encodeHexString(schemaVersion), e);
-            throw new RuntimeException("Can't get generic schema for topic " + schemaInfoProvider.getTopicName());
-        }
+        return reader.read(bytes, schemaVersion);
     }
 
     @Override
@@ -95,14 +67,7 @@ public abstract class AbstractStructSchema<T> extends AbstractSchema<T> {
 
     @Override
     public T decode(ByteBuf byteBuf, byte[] schemaVersion) {
-        try {
-            return schemaVersion == null ? decode(byteBuf) :
-                    readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(new ByteBufInputStream(byteBuf));
-        } catch (ExecutionException e) {
-            LOG.error("Can't get generic schema for topic {} schema version {}",
-                    schemaInfoProvider.getTopicName(), Hex.encodeHexString(schemaVersion), e);
-            throw new RuntimeException("Can't get generic schema for topic " + schemaInfoProvider.getTopicName());
-        }
+        return reader.read(new ByteBufInputStream(byteBuf), schemaVersion);
     }
 
     @Override
@@ -112,34 +77,8 @@ public abstract class AbstractStructSchema<T> extends AbstractSchema<T> {
 
     @Override
     public void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
-        this.schemaInfoProvider = schemaInfoProvider;
-    }
-
-    /**
-     * Load the schema reader for reading messages encoded by the given schema version.
-     *
-     * @param schemaVersion the provided schema version
-     * @return the schema reader for decoding messages encoded by the provided schema version.
-     */
-    protected abstract SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion);
-
-    /**
-     * TODO: think about how to make this async
-     */
-    protected SchemaInfo getSchemaInfoByVersion(byte[] schemaVersion) {
-        try {
-            return schemaInfoProvider.getSchemaByVersion(schemaVersion).get();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new SerializationException(
-                "Interrupted at fetching schema info for " + SchemaUtils.getStringSchemaVersion(schemaVersion),
-                e
-            );
-        } catch (ExecutionException e) {
-            throw new SerializationException(
-                "Failed at fetching schema info for " + SchemaUtils.getStringSchemaVersion(schemaVersion),
-                e.getCause()
-            );
+        if (reader != null) {
+            this.reader.setSchemaInfoProvider(schemaInfoProvider);
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroBaseStructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroBaseStructSchema.java
new file mode 100644
index 0000000..b464731
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroBaseStructSchema.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.avro.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseAvroSchema;
+
+/**
+ * This is a base schema implementation for Avro Based `Struct` types.
+ * A struct type is used for presenting records (objects) which
+ * have multiple fields.
+ *
+ * <p>Currently Pulsar supports 3 `Struct` types -
+ * {@link org.apache.pulsar.common.schema.SchemaType#AVRO},
+ * {@link org.apache.pulsar.common.schema.SchemaType#JSON},
+ * and {@link org.apache.pulsar.common.schema.SchemaType#PROTOBUF}.
+ */
+public abstract class AvroBaseStructSchema<T> extends AbstractStructSchema<T>{
+
+    protected final Schema schema;
+
+    public AvroBaseStructSchema(SchemaInfo schemaInfo) {
+        super(schemaInfo);
+        this.schema = parseAvroSchema(new String(schemaInfo.getSchema(), UTF_8));
+    }
+
+    public Schema getAvroSchema(){
+        return schema;
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index 3e35cb2..46227b8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl.schema;
 
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.avro.Conversions;
 import org.apache.avro.data.JodaTimeConversions;
 import org.apache.avro.data.TimeConversions;
@@ -26,9 +27,9 @@ import org.apache.avro.reflect.ReflectData;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.api.schema.SchemaReader;
-import org.apache.pulsar.client.impl.schema.reader.AvroReader;
+import org.apache.pulsar.client.api.schema.SchemaWriter;
+import org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader;
 import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
-import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.slf4j.Logger;
@@ -36,11 +37,14 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
+import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.getJsr310ConversionEnabledFromSchemaInfo;
+import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseSchemaInfo;
+
 /**
  * An AVRO schema implementation.
  */
 @Slf4j
-public class AvroSchema<T> extends StructSchema<T> {
+public class AvroSchema<T> extends AvroBaseStructSchema<T> {
     private static final Logger LOG = LoggerFactory.getLogger(AvroSchema.class);
 
     private ClassLoader pojoClassLoader;
@@ -49,10 +53,16 @@ public class AvroSchema<T> extends StructSchema<T> {
         super(schemaInfo);
         this.pojoClassLoader = pojoClassLoader;
         boolean jsr310ConversionEnabled = getJsr310ConversionEnabledFromSchemaInfo(schemaInfo);
-        setReader(new AvroReader<>(schema, pojoClassLoader, jsr310ConversionEnabled));
+        setReader(new MultiVersionAvroReader<>(schema, pojoClassLoader, getJsr310ConversionEnabledFromSchemaInfo(schemaInfo)));
         setWriter(new AvroWriter<>(schema, jsr310ConversionEnabled));
     }
 
+    private AvroSchema(SchemaReader<T> reader, SchemaWriter<T> writer, SchemaInfo schemaInfo) {
+        super(schemaInfo);
+        setReader(reader);
+        setWriter(writer);
+    }
+
     @Override
     public boolean supportSchemaVersioning() {
         return true;
@@ -68,6 +78,10 @@ public class AvroSchema<T> extends StructSchema<T> {
     }
 
     public static <T> AvroSchema<T> of(SchemaDefinition<T> schemaDefinition) {
+        if (schemaDefinition.getSchemaReaderOpt().isPresent() && schemaDefinition.getSchemaWriterOpt().isPresent()) {
+            return new AvroSchema<>(schemaDefinition.getSchemaReaderOpt().get(),
+                    schemaDefinition.getSchemaWriterOpt().get(), parseSchemaInfo(schemaDefinition, SchemaType.AVRO));
+        }
         ClassLoader pojoClassLoader = null;
         if (schemaDefinition.getPojo() != null) {
             pojoClassLoader = schemaDefinition.getPojo().getClassLoader();
@@ -88,31 +102,6 @@ public class AvroSchema<T> extends StructSchema<T> {
         return new AvroSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.AVRO), pojoClassLoader);
     }
 
-    @Override
-    protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
-        SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get());
-        if (schemaInfo != null) {
-            log.info("Load schema reader for version({}), schema is : {}, schemaInfo: {}",
-                SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
-                schemaInfo.getSchemaDefinition(), schemaInfo.toString());
-            boolean jsr310ConversionEnabled = getJsr310ConversionEnabledFromSchemaInfo(schemaInfo);
-            return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()), schema, pojoClassLoader, jsr310ConversionEnabled);
-        } else {
-            log.warn("No schema found for version({}), use latest schema : {}",
-                SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
-                this.schemaInfo.getSchemaDefinition());
-            return reader;
-        }
-    }
-
-    private static boolean getJsr310ConversionEnabledFromSchemaInfo(SchemaInfo schemaInfo) {
-        if (schemaInfo != null) {
-            return Boolean.parseBoolean(schemaInfo.getProperties()
-                    .getOrDefault(SchemaDefinitionBuilderImpl.JSR310_CONVERSION_ENABLED, "false"));
-        }
-        return false;
-    }
-
     public static void addLogicalTypeConversions(ReflectData reflectData, boolean jsr310ConversionEnabled) {
         reflectData.addLogicalTypeConversion(new Conversions.DecimalConversion());
         reflectData.addLogicalTypeConversion(new TimeConversions.DateConversion());
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
index 45651f3..4e3b874 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
@@ -25,22 +25,24 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
 import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.api.schema.SchemaReader;
 import org.apache.pulsar.client.api.schema.SchemaWriter;
 import org.apache.pulsar.client.impl.schema.reader.JacksonJsonReader;
 import org.apache.pulsar.client.impl.schema.writer.JacksonJsonWriter;
-import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
 import java.util.Map;
 
+import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseSchemaInfo;
+
 /**
  * A schema implementation to deal with json data.
  */
 @Slf4j
-public class JSONSchema<T> extends StructSchema<T> {
+public class JSONSchema<T> extends AvroBaseStructSchema<T> {
     // Cannot use org.apache.pulsar.common.util.ObjectMapperFactory.getThreadLocal() because it does not
     // return shaded version of object mapper
     private static final ThreadLocal<ObjectMapper> JSON_MAPPER = ThreadLocal.withInitial(() -> {
@@ -59,11 +61,6 @@ public class JSONSchema<T> extends StructSchema<T> {
         setReader(reader);
     }
 
-    @Override
-    protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
-        throw new RuntimeException("JSONSchema don't support schema versioning");
-    }
-
     /**
      * Implemented for backwards compatibility reasons
      * since the original schema generated by JSONSchema was based off the json schema standard
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
index 23dde95..f7971eb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
@@ -22,14 +22,14 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.GeneratedMessageV3;
+
 import lombok.AllArgsConstructor;
 import lombok.Getter;
+
 import org.apache.avro.protobuf.ProtobufData;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
-import org.apache.pulsar.client.api.schema.SchemaReader;
 import org.apache.pulsar.client.impl.schema.reader.ProtobufReader;
 import org.apache.pulsar.client.impl.schema.writer.ProtobufWriter;
-import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
@@ -45,7 +45,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 /**
  * A schema implementation to deal with protobuf generated messages.
  */
-public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> extends StructSchema<T> {
+public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> extends AvroBaseStructSchema<T> {
 
     public static final String PARSING_INFO_PROPERTY = "__PARSING_INFO__";
 
@@ -94,11 +94,6 @@ public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> ex
         }
     }
 
-    @Override
-    protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
-        throw new RuntimeException("ProtobufSchema don't support schema versioning");
-    }
-
     public static <T extends com.google.protobuf.GeneratedMessageV3> ProtobufSchema<T> of(Class<T> pojo) {
         return of(pojo, new HashMap<>());
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
index 986e186..8cc1868 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
@@ -40,6 +40,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
  * {@link org.apache.pulsar.common.schema.SchemaType#JSON},
  * and {@link org.apache.pulsar.common.schema.SchemaType#PROTOBUF}.
  */
+@Deprecated
 public abstract class StructSchema<T> extends AbstractStructSchema<T> {
 
     protected final Schema schema;
@@ -47,7 +48,6 @@ public abstract class StructSchema<T> extends AbstractStructSchema<T> {
     protected StructSchema(SchemaInfo schemaInfo) {
         super(schemaInfo);
         this.schema = parseAvroSchema(new String(schemaInfo.getSchema(), UTF_8));
-        this.schemaInfo = schemaInfo;
         if (schemaInfo.getProperties().containsKey(GenericAvroSchema.OFFSET_PROP)) {
             this.schema.addProp(GenericAvroSchema.OFFSET_PROP,
                     schemaInfo.getProperties().get(GenericAvroSchema.OFFSET_PROP));
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/AbstractMultiVersionGenericReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/AbstractMultiVersionGenericReader.java
new file mode 100644
index 0000000..58396a1
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/AbstractMultiVersionGenericReader.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.generic;
+
+import org.apache.avro.Schema;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.SchemaReader;
+import org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionAvroBaseReader;
+
+/**
+ * The abstract class of multi version generic reader.
+ */
+public abstract class AbstractMultiVersionGenericReader extends AbstractMultiVersionAvroBaseReader<GenericRecord> {
+
+    // the flag controls whether to use the provided schema as reader schema
+    // to decode the messages. In `AUTO_CONSUME` mode, setting this flag to `false`
+    // allows decoding the messages using the schema associated with the messages.
+    protected final boolean useProvidedSchemaAsReaderSchema;
+
+    protected AbstractMultiVersionGenericReader(boolean useProvidedSchemaAsReaderSchema,
+                                                SchemaReader<GenericRecord> providerSchemaReader,
+                                                Schema readerSchema) {
+        super(providerSchemaReader, readerSchema);
+        this.useProvidedSchemaAsReaderSchema = useProvidedSchemaAsReaderSchema;
+    }
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
index 94c5ba1..8f63328 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
@@ -19,12 +19,9 @@
 package org.apache.pulsar.client.impl.schema.generic;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
+
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
-import org.apache.pulsar.client.api.schema.SchemaReader;
-import org.apache.pulsar.client.impl.schema.SchemaUtils;
-import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
 /**
@@ -41,9 +38,14 @@ public class GenericAvroSchema extends GenericSchemaImpl {
 
     GenericAvroSchema(SchemaInfo schemaInfo,
                       boolean useProvidedSchemaAsReaderSchema) {
-        super(schemaInfo, useProvidedSchemaAsReaderSchema);
-        setReader(new GenericAvroReader(schema));
+        super(schemaInfo);
+        setReader(new MultiVersionGenericAvroReader(useProvidedSchemaAsReaderSchema, schema));
         setWriter(new GenericAvroWriter(schema));
+
+        if (schemaInfo.getProperties().containsKey(GenericAvroSchema.OFFSET_PROP)) {
+            this.schema.addProp(GenericAvroSchema.OFFSET_PROP,
+                    schemaInfo.getProperties().get(GenericAvroSchema.OFFSET_PROP));
+        }
     }
 
     @Override
@@ -59,34 +61,10 @@ public class GenericAvroSchema extends GenericSchemaImpl {
     @Override
     public org.apache.pulsar.client.api.Schema<GenericRecord> clone() {
         org.apache.pulsar.client.api.Schema<GenericRecord> schema =
-                GenericAvroSchema.of(schemaInfo, useProvidedSchemaAsReaderSchema);
+                GenericAvroSchema.of(schemaInfo, ((AbstractMultiVersionGenericReader) reader).useProvidedSchemaAsReaderSchema);
         if (schemaInfoProvider != null) {
             schema.setSchemaInfoProvider(schemaInfoProvider);
         }
         return schema;
     }
-
-    @Override
-    protected SchemaReader<GenericRecord> loadReader(BytesSchemaVersion schemaVersion) {
-         SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get());
-         if (schemaInfo != null) {
-             log.info("Load schema reader for version({}), schema is : {}",
-                 SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
-                 schemaInfo);
-             Schema writerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition());
-             Schema readerSchema = useProvidedSchemaAsReaderSchema ? schema : writerSchema;
-             readerSchema.addProp(OFFSET_PROP, schemaInfo.getProperties().getOrDefault(OFFSET_PROP, "0"));
-
-             return new GenericAvroReader(
-                     writerSchema,
-                     readerSchema,
-                     schemaVersion.get());
-         } else {
-             log.warn("No schema found for version({}), use latest schema : {}",
-                 SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
-                 this.schemaInfo);
-             return reader;
-         }
-    }
-
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
index 8edae25..87a1790 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
@@ -18,16 +18,8 @@
  */
 package org.apache.pulsar.client.impl.schema.generic;
 
-import java.util.stream.Collectors;
-
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
-import org.apache.pulsar.client.api.schema.Field;
-import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
-import org.apache.pulsar.client.api.schema.SchemaReader;
-import org.apache.pulsar.client.impl.schema.SchemaUtils;
-import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
 /**
@@ -42,35 +34,9 @@ public class GenericJsonSchema extends GenericSchemaImpl {
 
     GenericJsonSchema(SchemaInfo schemaInfo,
                       boolean useProvidedSchemaAsReaderSchema) {
-        super(schemaInfo, useProvidedSchemaAsReaderSchema);
+        super(schemaInfo);
         setWriter(new GenericJsonWriter());
-        setReader(new GenericJsonReader(fields, schemaInfo));
-    }
-
-    @Override
-    protected SchemaReader<GenericRecord> loadReader(BytesSchemaVersion schemaVersion) {
-        SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get());
-        if (schemaInfo != null) {
-            log.info("Load schema reader for version({}), schema is : {}",
-                SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
-                schemaInfo.getSchemaDefinition());
-            Schema readerSchema;
-            if (useProvidedSchemaAsReaderSchema) {
-                readerSchema = schema;
-            } else {
-                readerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition());
-            }
-            return new GenericJsonReader(schemaVersion.get(),
-                    readerSchema.getFields()
-                            .stream()
-                            .map(f -> new Field(f.name(), f.pos()))
-                            .collect(Collectors.toList()), schemaInfo);
-        } else {
-            log.warn("No schema found for version({}), use latest schema : {}",
-                SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
-                this.schemaInfo.getSchemaDefinition());
-            return reader;
-        }
+        setReader(new MultiVersionGenericJsonReader(useProvidedSchemaAsReaderSchema, schema, schemaInfo, fields));
     }
 
     @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
index 426bb7a..59e748c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.client.impl.schema.generic;
 import org.apache.pulsar.client.api.schema.Field;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericSchema;
-import org.apache.pulsar.client.impl.schema.StructSchema;
+import org.apache.pulsar.client.impl.schema.AvroBaseStructSchema;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
 import java.util.List;
@@ -32,23 +32,17 @@ import java.util.stream.Collectors;
  * warning :
  * we suggest migrate GenericSchemaImpl.of() to  <GenericSchema Implementor>.of() method (e.g. GenericJsonSchema 、GenericAvroSchema )
  */
-public abstract class GenericSchemaImpl extends StructSchema<GenericRecord> implements GenericSchema<GenericRecord> {
+public abstract class GenericSchemaImpl extends AvroBaseStructSchema<GenericRecord> implements GenericSchema<GenericRecord> {
 
     protected final List<Field> fields;
-    // the flag controls whether to use the provided schema as reader schema
-    // to decode the messages. In `AUTO_CONSUME` mode, setting this flag to `false`
-    // allows decoding the messages using the schema associated with the messages.
-    protected final boolean useProvidedSchemaAsReaderSchema;
 
-    protected GenericSchemaImpl(SchemaInfo schemaInfo,
-                                boolean useProvidedSchemaAsReaderSchema) {
+    protected GenericSchemaImpl(SchemaInfo schemaInfo) {
         super(schemaInfo);
 
         this.fields = schema.getFields()
                 .stream()
                 .map(f -> new Field(f.name(), f.pos()))
                 .collect(Collectors.toList());
-        this.useProvidedSchemaAsReaderSchema = useProvidedSchemaAsReaderSchema;
     }
 
     @Override
@@ -69,9 +63,9 @@ public abstract class GenericSchemaImpl extends StructSchema<GenericRecord> impl
     /**
      * warning :
      * we suggest migrate GenericSchemaImpl.of() to  <GenericSchema Implementor>.of() method (e.g. GenericJsonSchema 、GenericAvroSchema )
-     * @param schemaInfo
-     * @param useProvidedSchemaAsReaderSchema
-     * @return
+     * @param schemaInfo {@link SchemaInfo}
+     * @param useProvidedSchemaAsReaderSchema {@link Boolean}
+     * @return generic schema implementation
      */
     public static GenericSchemaImpl of(SchemaInfo schemaInfo,
                                        boolean useProvidedSchemaAsReaderSchema) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericAvroReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericAvroReader.java
new file mode 100644
index 0000000..0c2f4d2
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericAvroReader.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.generic;
+
+import org.apache.avro.Schema;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.SchemaReader;
+import org.apache.pulsar.client.impl.schema.SchemaUtils;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseAvroSchema;
+
+/**
+ * A multi version generic avro reader.
+ */
+public class MultiVersionGenericAvroReader extends AbstractMultiVersionGenericReader {
+
+    public MultiVersionGenericAvroReader(boolean useProvidedSchemaAsReaderSchema, Schema readerSchema) {
+        super(useProvidedSchemaAsReaderSchema, new GenericAvroReader(readerSchema), readerSchema);
+    }
+
+    @Override
+    protected SchemaReader<GenericRecord> loadReader(BytesSchemaVersion schemaVersion) {
+        SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get());
+        if (schemaInfo != null) {
+            LOG.info("Load schema reader for version({}), schema is : {}",
+                    SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
+                    schemaInfo);
+            Schema writerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition());
+            Schema readerSchema = useProvidedSchemaAsReaderSchema ? this.readerSchema : writerSchema;
+            readerSchema.addProp(GenericAvroSchema.OFFSET_PROP,
+                    schemaInfo.getProperties().getOrDefault(GenericAvroSchema.OFFSET_PROP, "0"));
+
+            return new GenericAvroReader(
+                    writerSchema,
+                    readerSchema,
+                    schemaVersion.get());
+        } else {
+            LOG.warn("No schema found for version({}), use latest schema : {}",
+                    SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
+                    this.readerSchema);
+            return providerSchemaReader;
+        }
+    }
+    protected static final Logger LOG = LoggerFactory.getLogger(MultiVersionGenericAvroReader.class);
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericJsonReader.java
similarity index 63%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericJsonReader.java
index 8edae25..daab882 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericJsonReader.java
@@ -18,45 +18,41 @@
  */
 package org.apache.pulsar.client.impl.schema.generic;
 
-import java.util.stream.Collectors;
-
-import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
 import org.apache.pulsar.client.api.schema.Field;
 import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
 import org.apache.pulsar.client.api.schema.SchemaReader;
 import org.apache.pulsar.client.impl.schema.SchemaUtils;
 import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseAvroSchema;
 
 /**
- * A generic json schema.
+ * A multi version generic json reader.
  */
-@Slf4j
-public class GenericJsonSchema extends GenericSchemaImpl {
+public class MultiVersionGenericJsonReader extends AbstractMultiVersionGenericReader {
 
-    public GenericJsonSchema(SchemaInfo schemaInfo) {
-        this(schemaInfo, true);
-    }
-
-    GenericJsonSchema(SchemaInfo schemaInfo,
-                      boolean useProvidedSchemaAsReaderSchema) {
-        super(schemaInfo, useProvidedSchemaAsReaderSchema);
-        setWriter(new GenericJsonWriter());
-        setReader(new GenericJsonReader(fields, schemaInfo));
+    public MultiVersionGenericJsonReader(boolean useProvidedSchemaAsReaderSchema, Schema readerSchema,
+                                         SchemaInfo schemaInfo, List<Field> fields) {
+        super(useProvidedSchemaAsReaderSchema, new GenericJsonReader(fields, schemaInfo), readerSchema);
     }
 
     @Override
     protected SchemaReader<GenericRecord> loadReader(BytesSchemaVersion schemaVersion) {
         SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get());
         if (schemaInfo != null) {
-            log.info("Load schema reader for version({}), schema is : {}",
-                SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
-                schemaInfo.getSchemaDefinition());
+            LOG.info("Load schema reader for version({}), schema is : {}",
+                    SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
+                    schemaInfo.getSchemaDefinition());
             Schema readerSchema;
             if (useProvidedSchemaAsReaderSchema) {
-                readerSchema = schema;
+                readerSchema = this.readerSchema;
             } else {
                 readerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition());
             }
@@ -66,15 +62,12 @@ public class GenericJsonSchema extends GenericSchemaImpl {
                             .map(f -> new Field(f.name(), f.pos()))
                             .collect(Collectors.toList()), schemaInfo);
         } else {
-            log.warn("No schema found for version({}), use latest schema : {}",
-                SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
-                this.schemaInfo.getSchemaDefinition());
-            return reader;
+            LOG.warn("No schema found for version({}), use latest schema : {}",
+                    SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
+                    this.readerSchema);
+            return providerSchemaReader;
         }
     }
 
-    @Override
-    public GenericRecordBuilder newRecordBuilder() {
-        throw new UnsupportedOperationException("Json Schema doesn't support record builder yet");
-    }
+    protected static final Logger LOG = LoggerFactory.getLogger(MultiVersionGenericAvroReader.class);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AbstractMultiVersionAvroBaseReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AbstractMultiVersionAvroBaseReader.java
new file mode 100644
index 0000000..b657e42
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AbstractMultiVersionAvroBaseReader.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.reader;
+
+import org.apache.avro.Schema;
+import org.apache.pulsar.client.api.schema.SchemaReader;
+
+/**
+ * The abstract class of multi version avro base reader.
+ */
+public abstract class AbstractMultiVersionAvroBaseReader<T> extends AbstractMultiVersionReader<T> {
+
+    protected Schema readerSchema;
+
+    public AbstractMultiVersionAvroBaseReader(SchemaReader<T> providerSchemaReader, Schema readerSchema) {
+        super(providerSchemaReader);
+        this.readerSchema = readerSchema;
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AbstractMultiVersionReader.java
similarity index 71%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AbstractMultiVersionReader.java
index 7730474..75a6a5d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AbstractMultiVersionReader.java
@@ -16,38 +16,33 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.impl.schema;
+package org.apache.pulsar.client.impl.schema.reader;
 
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufInputStream;
 import org.apache.avro.AvroTypeException;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.lang3.SerializationException;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
 import org.apache.pulsar.client.api.schema.SchemaReader;
-import org.apache.pulsar.client.api.schema.SchemaWriter;
+import org.apache.pulsar.client.impl.schema.SchemaUtils;
 import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.InputStream;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 /**
- * minimal abstract StructSchema
+ * The multi version reader abstract class, implement it will handle the multi version schema.
  */
-public abstract class AbstractStructSchema<T> extends AbstractSchema<T> {
+public abstract class AbstractMultiVersionReader<T> implements SchemaReader<T> {
 
-    protected static final Logger LOG = LoggerFactory.getLogger(AbstractStructSchema.class);
-
-    protected SchemaInfo schemaInfo;
-    protected SchemaReader<T> reader;
-    protected SchemaWriter<T> writer;
+    protected final SchemaReader<T> providerSchemaReader;
     protected SchemaInfoProvider schemaInfoProvider;
 
     LoadingCache<BytesSchemaVersion, SchemaReader<T>> readerCache = CacheBuilder.newBuilder().maximumSize(100000)
@@ -58,30 +53,26 @@ public abstract class AbstractStructSchema<T> extends AbstractSchema<T> {
                 }
             });
 
-    public AbstractStructSchema(SchemaInfo schemaInfo){
-        this.schemaInfo = schemaInfo;
+    AbstractMultiVersionReader(SchemaReader<T> providerSchemaReader) {
+        this.providerSchemaReader = providerSchemaReader;
     }
 
-
     @Override
-    public byte[] encode(T message) {
-        return writer.write(message);
+    public T read(byte[] bytes, int offset, int length) {
+        return providerSchemaReader.read(bytes);
     }
 
     @Override
-    public T decode(byte[] bytes) {
-        return reader.read(bytes);
+    public T read(InputStream inputStream) {
+        return providerSchemaReader.read(inputStream);
     }
 
     @Override
-    public T decode(byte[] bytes, byte[] schemaVersion) {
+    public T read(InputStream inputStream, byte[] schemaVersion) {
         try {
-            return schemaVersion == null ? decode(bytes) :
-                    readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(bytes);
-        } catch (ExecutionException | AvroTypeException e) {
-            if (e instanceof AvroTypeException) {
-                throw new SchemaSerializationException(e);
-            }
+            return schemaVersion == null ? read(inputStream) :
+                    readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(inputStream);
+        } catch (ExecutionException e) {
             LOG.error("Can't get generic schema for topic {} schema version {}",
                     schemaInfoProvider.getTopicName(), Hex.encodeHexString(schemaVersion), e);
             throw new RuntimeException("Can't get generic schema for topic " + schemaInfoProvider.getTopicName());
@@ -89,16 +80,14 @@ public abstract class AbstractStructSchema<T> extends AbstractSchema<T> {
     }
 
     @Override
-    public T decode(ByteBuf byteBuf) {
-        return reader.read(new ByteBufInputStream(byteBuf));
-    }
-
-    @Override
-    public T decode(ByteBuf byteBuf, byte[] schemaVersion) {
+    public T read(byte[] bytes, byte[] schemaVersion) {
         try {
-            return schemaVersion == null ? decode(byteBuf) :
-                    readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(new ByteBufInputStream(byteBuf));
-        } catch (ExecutionException e) {
+            return schemaVersion == null ? read(bytes) :
+                    readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(bytes);
+        } catch (ExecutionException | AvroTypeException e) {
+            if (e instanceof AvroTypeException) {
+                throw new SchemaSerializationException(e);
+            }
             LOG.error("Can't get generic schema for topic {} schema version {}",
                     schemaInfoProvider.getTopicName(), Hex.encodeHexString(schemaVersion), e);
             throw new RuntimeException("Can't get generic schema for topic " + schemaInfoProvider.getTopicName());
@@ -106,11 +95,6 @@ public abstract class AbstractStructSchema<T> extends AbstractSchema<T> {
     }
 
     @Override
-    public SchemaInfo getSchemaInfo() {
-        return this.schemaInfo;
-    }
-
-    @Override
     public void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
         this.schemaInfoProvider = schemaInfoProvider;
     }
@@ -132,27 +116,16 @@ public abstract class AbstractStructSchema<T> extends AbstractSchema<T> {
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new SerializationException(
-                "Interrupted at fetching schema info for " + SchemaUtils.getStringSchemaVersion(schemaVersion),
-                e
+                    "Interrupted at fetching schema info for " + SchemaUtils.getStringSchemaVersion(schemaVersion),
+                    e
             );
         } catch (ExecutionException e) {
             throw new SerializationException(
-                "Failed at fetching schema info for " + SchemaUtils.getStringSchemaVersion(schemaVersion),
-                e.getCause()
+                    "Failed at fetching schema info for " + SchemaUtils.getStringSchemaVersion(schemaVersion),
+                    e.getCause()
             );
         }
     }
 
-    protected void setWriter(SchemaWriter<T> writer) {
-        this.writer = writer;
-    }
-
-    protected void setReader(SchemaReader<T> reader) {
-        this.reader = reader;
-    }
-
-    protected SchemaReader<T> getReader() {
-        return  reader;
-    }
-
+    protected static final Logger LOG = LoggerFactory.getLogger(AbstractMultiVersionReader.class);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/MultiVersionAvroReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/MultiVersionAvroReader.java
new file mode 100644
index 0000000..8f3e5cc
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/MultiVersionAvroReader.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.reader;
+
+import org.apache.avro.Schema;
+import org.apache.pulsar.client.api.schema.SchemaReader;
+import org.apache.pulsar.client.impl.schema.SchemaUtils;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.getJsr310ConversionEnabledFromSchemaInfo;
+import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseAvroSchema;
+
+/**
+ * A multi version avro reader.
+ */
+public class MultiVersionAvroReader<T> extends AbstractMultiVersionAvroBaseReader<T> {
+
+    private final ClassLoader pojoClassLoader;
+
+    public MultiVersionAvroReader(Schema readerSchema, ClassLoader pojoClassLoader, boolean jsr310ConversionEnabled) {
+        super(new AvroReader<>(readerSchema, pojoClassLoader, jsr310ConversionEnabled), readerSchema);
+        this.pojoClassLoader = pojoClassLoader;
+    }
+
+    @Override
+    protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
+        SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get());
+        if (schemaInfo != null) {
+            LOG.info("Load schema reader for version({}), schema is : {}, schemaInfo: {}",
+                    SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
+                    schemaInfo.getSchemaDefinition(), schemaInfo.toString());
+            boolean jsr310ConversionEnabled = getJsr310ConversionEnabledFromSchemaInfo(schemaInfo);
+            return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()),
+                    readerSchema, pojoClassLoader, jsr310ConversionEnabled);
+        } else {
+            LOG.warn("No schema found for version({}), use latest schema : {}",
+                    SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
+                    this.readerSchema);
+            return providerSchemaReader;
+        }
+    }
+
+    protected static final Logger LOG = LoggerFactory.getLogger(MultiVersionAvroReader.class);
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java
similarity index 70%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java
index 986e186..7e1e1c0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java
@@ -16,13 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.impl.schema;
+package org.apache.pulsar.client.impl.schema.util;
 
 import org.apache.avro.Schema;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
-import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.client.impl.schema.SchemaDefinitionBuilderImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
@@ -30,36 +30,31 @@ import java.lang.reflect.Field;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
-/**
- * This is a base schema implementation for Avro Based `Struct` types.
- * A struct type is used for presenting records (objects) which
- * have multiple fields.
- *
- * <p>Currently Pulsar supports 3 `Struct` types -
- * {@link org.apache.pulsar.common.schema.SchemaType#AVRO},
- * {@link org.apache.pulsar.common.schema.SchemaType#JSON},
- * and {@link org.apache.pulsar.common.schema.SchemaType#PROTOBUF}.
- */
-public abstract class StructSchema<T> extends AbstractStructSchema<T> {
-
-    protected final Schema schema;
+public class SchemaUtil {
 
-    protected StructSchema(SchemaInfo schemaInfo) {
-        super(schemaInfo);
-        this.schema = parseAvroSchema(new String(schemaInfo.getSchema(), UTF_8));
-        this.schemaInfo = schemaInfo;
-        if (schemaInfo.getProperties().containsKey(GenericAvroSchema.OFFSET_PROP)) {
-            this.schema.addProp(GenericAvroSchema.OFFSET_PROP,
-                    schemaInfo.getProperties().get(GenericAvroSchema.OFFSET_PROP));
+    public static boolean getJsr310ConversionEnabledFromSchemaInfo(SchemaInfo schemaInfo) {
+        if (schemaInfo != null) {
+            return Boolean.parseBoolean(schemaInfo.getProperties()
+                    .getOrDefault(SchemaDefinitionBuilderImpl.JSR310_CONVERSION_ENABLED, "false"));
         }
+        return false;
     }
 
-    public Schema getAvroSchema() {
-        return schema;
+    public static Schema parseAvroSchema(String schemaJson) {
+        final Schema.Parser parser = new Schema.Parser();
+        parser.setValidateDefaults(false);
+        return parser.parse(schemaJson);
     }
 
+    public static <T> SchemaInfo parseSchemaInfo(SchemaDefinition<T> schemaDefinition, SchemaType schemaType) {
+        return SchemaInfo.builder()
+                .schema(createAvroSchema(schemaDefinition).toString().getBytes(UTF_8))
+                .properties(schemaDefinition.getProperties())
+                .name("")
+                .type(schemaType).build();
+    }
 
-    protected static Schema createAvroSchema(SchemaDefinition schemaDefinition) {
+    public static Schema createAvroSchema(SchemaDefinition schemaDefinition) {
         Class pojo = schemaDefinition.getPojo();
 
         if (StringUtils.isNotBlank(schemaDefinition.getJsonDef())) {
@@ -89,7 +84,7 @@ public abstract class StructSchema<T> extends AbstractStructSchema<T> {
         }
     }
 
-    protected static Schema extractAvroSchema(SchemaDefinition schemaDefinition, Class pojo) {
+    public static Schema extractAvroSchema(SchemaDefinition schemaDefinition, Class pojo) {
         try {
             return parseAvroSchema(pojo.getDeclaredField("SCHEMA$").get(null).toString());
         } catch (NoSuchFieldException | IllegalAccessException | IllegalArgumentException ignored) {
@@ -97,19 +92,4 @@ public abstract class StructSchema<T> extends AbstractStructSchema<T> {
                     : ReflectData.get().getSchema(pojo);
         }
     }
-
-    protected static Schema parseAvroSchema(String schemaJson) {
-        final Schema.Parser parser = new Schema.Parser();
-        parser.setValidateDefaults(false);
-        return parser.parse(schemaJson);
-    }
-
-    public static <T> SchemaInfo parseSchemaInfo(SchemaDefinition<T> schemaDefinition, SchemaType schemaType) {
-        return SchemaInfo.builder()
-                .schema(createAvroSchema(schemaDefinition).toString().getBytes(UTF_8))
-                .properties(schemaDefinition.getProperties())
-                .name("")
-                .type(schemaType).build();
-    }
-
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
index 8fd8154..9116e28 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
@@ -32,6 +32,7 @@ import java.time.LocalDate;
 import java.time.LocalTime;
 import java.util.Arrays;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import lombok.Data;
@@ -50,10 +51,15 @@ import org.apache.avro.reflect.Nullable;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
 import org.apache.pulsar.client.api.schema.SchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaDefinitionBuilder;
+import org.apache.pulsar.client.api.schema.SchemaReader;
+import org.apache.pulsar.client.api.schema.SchemaWriter;
 import org.apache.pulsar.client.avro.generated.NasaMission;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
+import org.apache.pulsar.client.impl.schema.reader.JacksonJsonReader;
 import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
+import org.apache.pulsar.client.impl.schema.writer.JacksonJsonWriter;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.joda.time.DateTime;
@@ -392,4 +398,25 @@ public class AvroSchemaTest {
         Assert.assertEquals(((BufferedBinaryEncoder)encoder).bytesBuffered(), 0);
     }
 
+    @Test
+    public void testAvroSchemaUserDefinedReadAndWriter() {
+        SchemaReader<Foo> reader = new JacksonJsonReader<>(new ObjectMapper(), Foo.class);
+        SchemaWriter<Foo> writer = new JacksonJsonWriter<>(new ObjectMapper());
+        SchemaDefinition<Foo> schemaDefinition = SchemaDefinition.<Foo>builder()
+                .withPojo(Bar.class)
+                .withSchemaReader(reader)
+                .withSchemaWriter(writer)
+                .build();
+
+        AvroSchema<Foo> schema = AvroSchema.of(schemaDefinition);
+        Foo foo = new Foo();
+        foo.setColor(SchemaTestUtils.Color.RED);
+        String field1 = "test";
+        foo.setField1(field1);
+        schema.encode(foo);
+        foo = schema.decode(schema.encode(foo));
+        assertEquals(foo.getColor(), SchemaTestUtils.Color.RED);
+        assertEquals(field1, foo.getField1());
+    }
+
 }
diff --git a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkTest.java b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkTest.java
index a2e77cd..ef71e00 100644
--- a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkTest.java
+++ b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkTest.java
@@ -28,10 +28,10 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericSchema;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.schema.AbstractStructSchema;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
-import org.apache.pulsar.client.impl.schema.StructSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
 import org.apache.pulsar.functions.api.Record;
@@ -144,7 +144,7 @@ public class InfluxDBSinkTest {
         openWriteClose(jsonSchema);
     }
 
-    private void openWriteClose(StructSchema<Cpu> schema) throws Exception {
+    private void openWriteClose(AbstractStructSchema<Cpu> schema) throws Exception {
         // test open
         Map<String, Object> map = new HashMap();
         map.put("influxdbUrl", "http://localhost:9999");
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestAvroSchemaHandler.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestAvroSchemaHandler.java
index 7b150ea..4dbda98 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestAvroSchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestAvroSchemaHandler.java
@@ -28,7 +28,6 @@ import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
-import org.apache.pulsar.client.impl.schema.StructSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
 import org.apache.pulsar.common.api.raw.RawMessage;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -41,6 +40,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseSchemaInfo;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.when;
 
@@ -77,7 +77,7 @@ public class TestAvroSchemaHandler {
         Schema schema1 = ReflectData.AllowNull.get().getSchema(Foo1.class);
         PulsarSqlSchemaInfoProvider pulsarSqlSchemaInfoProvider = mock(PulsarSqlSchemaInfoProvider.class);
         AvroSchemaHandler avroSchemaHandler = new AvroSchemaHandler(pulsarSqlSchemaInfoProvider,
-                StructSchema.parseSchemaInfo(SchemaDefinition.builder().withPojo(Foo2.class).build(), SchemaType.AVRO), columnHandles);
+                parseSchemaInfo(SchemaDefinition.builder().withPojo(Foo2.class).build(), SchemaType.AVRO), columnHandles);
         byte[] schemaVersion = new byte[8];
         for (int i = 0 ; i<8; i++) {
             schemaVersion[i] = 0;
@@ -100,7 +100,7 @@ public class TestAvroSchemaHandler {
         when(message.getData()).thenReturn(ByteBufAllocator.DEFAULT
                 .buffer(bytes.length, bytes.length).writeBytes(byteArrayOutputStream.toByteArray()));
         when(pulsarSqlSchemaInfoProvider.getSchemaByVersion(any()))
-                .thenReturn(completedFuture(StructSchema.parseSchemaInfo(SchemaDefinition.builder()
+                .thenReturn(completedFuture(parseSchemaInfo(SchemaDefinition.builder()
                         .withPojo(Foo1.class).build(), SchemaType.AVRO)));
 
         Object object  = ((GenericAvroRecord)avroSchemaHandler.deserialize(message.getData(),