You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/04/28 04:04:50 UTC

[pulsar] branch master updated: Add the multi version schema support (#3876)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d5ff082  Add the multi version schema support (#3876)
d5ff082 is described below

commit d5ff0828d222775d0562a1b5d589e3f231a1716f
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Sun Apr 28 12:04:45 2019 +0800

    Add the multi version schema support (#3876)
    
    ### Motivation
    
    Fix #3742
    
    In order to decode the message correctly by AVRO schema, we need to know the schema what the message is.
    
    ### Modification
    
    - Introduced Schema Reader and Schema Writer for StructSchema.
       - Reader is used to decode message
       - Writer is used to encode message
    - The implementations of StructSchema, provides their schema reader and writer implementations.
    - Introduced a schema reader cache for caching the readers for different schema versions.
---
 .../java/org/apache/pulsar/client/api/Schema.java  |  18 +++-
 .../pulsar/client/api/schema/GenericSchema.java    |   2 +-
 .../pulsar/client/api/schema/SchemaDefinition.java |   7 ++
 .../client/api/schema/SchemaDefinitionBuilder.java |  15 ++-
 ...{GenericSchema.java => SchemaInfoProvider.java} |  27 ++++--
 .../pulsar/client/api/schema/SchemaReader.java     |  18 ++--
 .../pulsar/client/api/schema/SchemaWriter.java     |  18 ++--
 .../client/internal/DefaultImplementation.java     |   8 +-
 .../apache/pulsar/common/schema/SchemaInfo.java    |   2 +
 .../org/apache/pulsar/client/impl/MessageImpl.java |   7 +-
 .../client/impl/MultiTopicsConsumerImpl.java       |   7 ++
 .../pulsar/client/impl/PulsarClientImpl.java       |  94 ++++++++++++-------
 .../client/impl/schema/AutoConsumeSchema.java      |  11 +++
 .../pulsar/client/impl/schema/AvroSchema.java      |  85 ++++++-----------
 .../pulsar/client/impl/schema/JSONSchema.java      |  46 +++------
 .../pulsar/client/impl/schema/ProtobufSchema.java  |  87 +++++++++--------
 .../impl/schema/SchemaDefinitionBuilderImpl.java   |  12 ++-
 .../client/impl/schema/SchemaDefinitionImpl.java   |  19 ++--
 .../pulsar/client/impl/schema/StructSchema.java    | 104 +++++++++++++++++----
 ...nericAvroSchema.java => GenericAvroReader.java} |  75 +++++++--------
 .../impl/schema/generic/GenericAvroSchema.java     |  64 ++++---------
 ...nericAvroSchema.java => GenericAvroWriter.java} |  51 ++--------
 ...nericJsonSchema.java => GenericJsonReader.java} |  46 ++++-----
 .../impl/schema/generic/GenericJsonSchema.java     |  43 ++++-----
 ...{SchemaProvider.java => GenericJsonWriter.java} |  32 ++++---
 .../impl/schema/generic/GenericSchemaImpl.java     |  32 ++-----
 .../schema/generic/MultiVersionGenericSchema.java  |  62 ------------
 ...er.java => MultiVersionSchemaInfoProvider.java} |  45 +++++----
 .../client/impl/schema/reader/AvroReader.java      |  58 ++++++++++++
 .../client/impl/schema/reader/JsonReader.java      |  49 ++++------
 .../client/impl/schema/reader/ProtobufReader.java  |  47 ++++------
 .../client/impl/schema/writer/AvroWriter.java      |  53 +++++++++++
 .../client/impl/schema/writer/JsonWriter.java      |  37 ++++----
 .../ProtobufWriter.java}                           |  21 ++---
 .../client/impl/schema/ProtobufSchemaTest.java     |  21 ++---
 .../client/impl/schema/SchemaBuilderTest.java      |   2 +-
 .../pulsar/client/impl/schema/SchemaTestUtils.java |  12 +++
 .../schema/SupportVersioningAvroSchemaTest.java    |  66 +++++++++++++
 .../impl/schema/generic/GenericAvroReaderTest.java |  86 +++++++++++++++++
 .../impl/schema/generic/GenericSchemaImplTest.java |  64 +++++++++++--
 .../generic/MultiVersionGenericSchemaTest.java     |  90 ------------------
 ...ava => MultiVersionSchemaInfoProviderTest.java} |  13 ++-
 .../io/hbase/sink/HbaseGenericRecordSinkTest.java  |  18 +++-
 .../io/influxdb/InfluxDBGenericRecordSinkTest.java |  14 ++-
 .../org/apache/pulsar/io/jdbc/JdbcSinkTest.java    |  21 +++--
 .../pulsar/io/solr/SolrGenericRecordSinkTest.java  |  17 +++-
 .../tests/integration/schema/SchemaTest.java       |  56 ++++++++++-
 .../pulsar/tests/integration/schema/Schemas.java   |  18 ++++
 48 files changed, 1031 insertions(+), 769 deletions(-)

diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
index 18e02bf..72de4fe 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -26,6 +26,7 @@ import java.util.Date;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericSchema;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
 import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -80,6 +81,9 @@ public interface Schema<T> {
         return false;
     }
 
+    default void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
+    }
+
     /**
      * Decode a byte array into an object using the schema definition and deserializer implementation
      *
@@ -183,7 +187,17 @@ public interface Schema<T> {
      * @return a Schema instance
      */
     static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF(Class<T> clazz) {
-        return DefaultImplementation.newProtobufSchema(clazz);
+        return DefaultImplementation.newProtobufSchema(SchemaDefinition.builder().withPojo(clazz).build());
+    }
+
+    /**
+     * Create a Protobuf schema type with schema definition.
+     *
+     * @param schemaDefinition schemaDefinition the definition of the schema
+     * @return a Schema instance
+     */
+    static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF(SchemaDefinition<T> schemaDefinition) {
+        return DefaultImplementation.newProtobufSchema(schemaDefinition);
     }
 
     /**
@@ -300,7 +314,7 @@ public interface Schema<T> {
      * @param schemaInfo schema info
      * @return a generic schema instance
      */
-    static GenericSchema generic(SchemaInfo schemaInfo) {
+    static GenericSchema<GenericRecord> generic(SchemaInfo schemaInfo) {
         return DefaultImplementation.getGenericSchema(schemaInfo);
     }
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/GenericSchema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/GenericSchema.java
index 8b5137d..95e6939 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/GenericSchema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/GenericSchema.java
@@ -24,7 +24,7 @@ import org.apache.pulsar.client.api.Schema;
 /**
  * A schema that serializes and deserializes between {@link GenericRecord} and bytes.
  */
-public interface GenericSchema extends Schema<GenericRecord> {
+public interface GenericSchema<T extends GenericRecord> extends Schema<T> {
 
     /**
      * Returns the list of fields.
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java
index daf90df..32fddec 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java
@@ -61,4 +61,11 @@ public interface SchemaDefinition<T> {
      * @return pojo schema
      */
     public Class<T> getPojo();
+
+    /**
+     * Get supportSchemaVersioning schema definition
+     *
+     * @return the flag of supportSchemaVersioning
+     */
+    public boolean getSupportSchemaVersioning();
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java
index 77bb363..8305f02 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java
@@ -49,7 +49,7 @@ public interface SchemaDefinitionBuilder<T> {
      * @param key property key
      * @param value property value
      *
-     * @return record schema definition
+     * @return schema definition builder
      */
     SchemaDefinitionBuilder<T> addProperty(String key, String value);
 
@@ -58,7 +58,7 @@ public interface SchemaDefinitionBuilder<T> {
      *
      * @param pojo pojo schema definition
      *
-     * @return record schema definition
+     * @return schema definition builder
      */
     SchemaDefinitionBuilder<T> withPojo(Class pojo);
 
@@ -67,11 +67,20 @@ public interface SchemaDefinitionBuilder<T> {
      *
      * @param jsonDefinition json schema definition
      *
-     * @return record schema definition
+     * @return schema definition builder
      */
     SchemaDefinitionBuilder<T> withJsonDef(String jsonDefinition);
 
     /**
+     * Set schema whether decode by schema version
+     *
+     * @param supportSchemaVersioning decode by version
+     *
+     * @return schema definition builder
+     */
+    SchemaDefinitionBuilder<T> withSupportSchemaVersioning(boolean supportSchemaVersioning);
+
+    /**
      * Build the schema definition.
      *
      * @return the schema definition.
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/GenericSchema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaInfoProvider.java
similarity index 61%
copy from pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/GenericSchema.java
copy to pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaInfoProvider.java
index 8b5137d..81e4f44 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/GenericSchema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaInfoProvider.java
@@ -18,26 +18,33 @@
  */
 package org.apache.pulsar.client.api.schema;
 
-import java.util.List;
-import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
 
 /**
- * A schema that serializes and deserializes between {@link GenericRecord} and bytes.
+ * Schema Provider.
  */
-public interface GenericSchema extends Schema<GenericRecord> {
+public interface SchemaInfoProvider {
 
     /**
-     * Returns the list of fields.
+     * Retrieve the schema info of a given <tt>schemaVersion</tt>.
      *
-     * @return the list of fields of generic record.
+     * @param schemaVersion schema version
+     * @return schema info of the provided <tt>schemaVersion</tt>
      */
-    List<Field> getFields();
+    SchemaInfo getSchemaByVersion(byte[] schemaVersion);
 
     /**
-     * Create a builder to build {@link GenericRecord}.
+     * Retrieve the latest schema info.
      *
-     * @return generic record builder
+     * @return the latest schema
      */
-    GenericRecordBuilder newRecordBuilder();
+    SchemaInfo getLatestSchema();
+
+    /**
+     * Retrieve the topic name.
+     *
+     * @return the topic name
+     */
+    public String getTopicName();
 
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/SchemaProvider.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java
similarity index 67%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/SchemaProvider.java
copy to pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java
index 18778e1..0f33935 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/SchemaProvider.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java
@@ -16,21 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.impl.schema.generic;
+package org.apache.pulsar.client.api.schema;
 
-import org.apache.pulsar.client.api.Schema;
-
-/**
- * Schema Provider.
- */
-public interface SchemaProvider<T> {
+public interface SchemaReader<T> {
 
     /**
-     * Retrieve the schema instance of a given <tt>schemaVersion</tt>.
+     * serialize bytes convert pojo
      *
-     * @param schemaVersion schema version
-     * @return schema instance of the provided <tt>schemaVersion</tt>
+     * @param bytes the data
+     * @return the serialized object
      */
-    Schema<T> getSchema(byte[] schemaVersion);
-
+    T read(byte[] bytes);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/SchemaProvider.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaWriter.java
similarity index 67%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/SchemaProvider.java
copy to pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaWriter.java
index 18778e1..a93739a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/SchemaProvider.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaWriter.java
@@ -16,21 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.impl.schema.generic;
+package org.apache.pulsar.client.api.schema;
 
-import org.apache.pulsar.client.api.Schema;
-
-/**
- * Schema Provider.
- */
-public interface SchemaProvider<T> {
+public interface SchemaWriter<T> {
 
     /**
-     * Retrieve the schema instance of a given <tt>schemaVersion</tt>.
+     * serialize the message into bytes
      *
-     * @param schemaVersion schema version
-     * @return schema instance of the provided <tt>schemaVersion</tt>
+     * @param message the message for encode
+     * @return the serialized bytes
      */
-    Schema<T> getSchema(byte[] schemaVersion);
-
+    byte[] write(T message);
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
index ccc0a15..da48774 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
@@ -214,10 +214,10 @@ public class DefaultImplementation {
                         .invoke(null,schemaDefinition));
     }
 
-    public static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> newProtobufSchema(Class<T> clazz) {
+    public static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> newProtobufSchema(SchemaDefinition schemaDefinition) {
         return catchExceptions(
-                () -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.ProtobufSchema", "of", Class.class)
-                        .invoke(null, clazz));
+                () -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.ProtobufSchema", "of", SchemaDefinition.class)
+                        .invoke(null, schemaDefinition));
     }
 
     public static <T> Schema<T> newJSONSchema(SchemaDefinition schemaDefinition) {
@@ -262,7 +262,7 @@ public class DefaultImplementation {
                         "getSchema", SchemaInfo.class).invoke(null, schemaInfo));
     }
 
-    public static GenericSchema getGenericSchema(SchemaInfo schemaInfo) {
+    public static GenericSchema<GenericRecord> getGenericSchema(SchemaInfo schemaInfo) {
         return catchExceptions(
             () -> (GenericSchema) getStaticMethod("org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl",
                 "of", SchemaInfo.class).invoke(null, schemaInfo));
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
index abd9dea..87d0e15 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.Map;
 
 import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
@@ -31,6 +32,7 @@ import lombok.experimental.Accessors;
 @AllArgsConstructor
 @NoArgsConstructor
 @Accessors(chain = true)
+@Builder
 public class SchemaInfo {
 
     @EqualsAndHashCode.Exclude
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 6a38abb..6b9731b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -230,7 +230,7 @@ public class MessageImpl<T> implements Message<T> {
 
     @Override
     public byte[] getSchemaVersion() {
-        if (msgMetadataBuilder.hasSchemaVersion()) {
+        if (msgMetadataBuilder != null && msgMetadataBuilder.hasSchemaVersion()) {
             return msgMetadataBuilder.getSchemaVersion().toByteArray();
         } else {
             return null;
@@ -241,8 +241,9 @@ public class MessageImpl<T> implements Message<T> {
     public T getValue() {
         // check if the schema passed in from client supports schema versioning or not
         // this is an optimization to only get schema version when necessary
-        if (schema.supportSchemaVersioning()) {
-            return schema.decode(getData(), getSchemaVersion());
+        byte [] schemaVersion = getSchemaVersion();
+        if (schema.supportSchemaVersioning() && schemaVersion != null) {
+            return schema.decode(getData(), schemaVersion);
         } else {
             return schema.decode(getData());
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 8ebf1b9..312c1cd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -742,6 +742,13 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
 
         List<CompletableFuture<Consumer<T>>> futureList;
 
+        try {
+            client.preProcessSchemaBeforeSubscribe(client, schema, topicName);
+        } catch (Throwable t) {
+            subscribeResult.completeExceptionally(t);
+            return;
+        }
+
         if (numPartitions > 1) {
             this.topics.putIfAbsent(topicName, numPartitions);
             allTopicPartitionsNumber.addAndGet(numPartitions);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index bdd3345..3da7e14 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -22,6 +22,9 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -52,6 +55,8 @@ import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
 import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -60,6 +65,7 @@ import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
+import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
@@ -98,6 +104,15 @@ public class PulsarClientImpl implements PulsarClient {
 
     private final EventLoopGroup eventLoopGroup;
 
+    private final LoadingCache<String, SchemaInfoProvider> schemaProviderLoadingCache = CacheBuilder.newBuilder().maximumSize(100000)
+                    .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<String, SchemaInfoProvider>() {
+
+                @Override
+                public SchemaInfoProvider load(String topicName) {
+                    return newSchemaProvider(topicName);
+                }
+            });
+
     public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException {
         this(conf, getEventLoopGroup(conf));
     }
@@ -291,24 +306,12 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     private <T> CompletableFuture<Consumer<T>> singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
-        if (schema instanceof AutoConsumeSchema) {
-            AutoConsumeSchema autoConsumeSchema = (AutoConsumeSchema) schema;
-            return lookup.getSchema(TopicName.get(conf.getSingleTopic()))
-                    .thenCompose(schemaInfoOptional -> {
-                        if (schemaInfoOptional.isPresent() && schemaInfoOptional.get().getType() == SchemaType.AVRO) {
-                            GenericSchemaImpl genericSchema = GenericSchemaImpl.of(schemaInfoOptional.get());
-                            log.info("Auto detected schema for topic {} : {}",
-                                conf.getSingleTopic(), new String(schemaInfoOptional.get().getSchema(), UTF_8));
-                            autoConsumeSchema.setSchema(genericSchema);
-                            return doSingleTopicSubscribeAsync(conf, schema, interceptors);
-                        } else {
-                            return FutureUtil.failedFuture(
-                                new PulsarClientException.LookupException("Currently schema detection only works for topics with avro schemas"));
-                        }
-                    });
-        } else {
-            return doSingleTopicSubscribeAsync(conf, schema, interceptors);
+        try {
+            preProcessSchemaBeforeSubscribe(this, schema, conf.getSingleTopic());
+        } catch (Throwable t) {
+            return FutureUtil.failedFuture(t);
         }
+        return doSingleTopicSubscribeAsync(conf, schema, interceptors);
     }
 
     private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
@@ -423,24 +426,12 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     public <T> CompletableFuture<Reader<T>> createReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) {
-        if (schema instanceof AutoConsumeSchema) {
-            AutoConsumeSchema autoConsumeSchema = (AutoConsumeSchema) schema;
-            return lookup.getSchema(TopicName.get(conf.getTopicName()))
-                    .thenCompose(schemaInfoOptional -> {
-                        if (schemaInfoOptional.isPresent() && schemaInfoOptional.get().getType() == SchemaType.AVRO) {
-                            GenericSchemaImpl genericSchema = GenericSchemaImpl.of(schemaInfoOptional.get());
-                            log.info("Auto detected schema for topic {} : {}",
-                                conf.getTopicName(), new String(schemaInfoOptional.get().getSchema(), UTF_8));
-                            autoConsumeSchema.setSchema(genericSchema);
-                            return doCreateReaderAsync(conf, schema);
-                        } else {
-                            return FutureUtil.failedFuture(
-                                new PulsarClientException.LookupException("Currently schema detection only works for topics with avro schemas"));
-                        }
-                    });
-        } else {
-            return doCreateReaderAsync(conf, schema);
+        try {
+            preProcessSchemaBeforeSubscribe(this, schema, conf.getTopicName());
+        } catch (Throwable t) {
+            return FutureUtil.failedFuture(t);
         }
+        return doCreateReaderAsync(conf, schema);
     }
     <T> CompletableFuture<Reader<T>> doCreateReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) {
         if (state.get() != State.Open) {
@@ -737,4 +728,39 @@ public class PulsarClientImpl implements PulsarClient {
             return null;
         }
     }
+
+    private SchemaInfoProvider newSchemaProvider(String topicName) {
+        return new MultiVersionSchemaInfoProvider(TopicName.get(topicName), this);
+    }
+
+    private LoadingCache<String, SchemaInfoProvider> getSchemaProviderLoadingCache() {
+        return schemaProviderLoadingCache;
+    }
+
+    protected void preProcessSchemaBeforeSubscribe(PulsarClientImpl pulsarClientImpl, Schema schema, String topicName) throws Throwable {
+        if (schema != null && schema.supportSchemaVersioning()) {
+            SchemaInfoProvider schemaInfoProvider = null;
+            try {
+                schemaInfoProvider = pulsarClientImpl.getSchemaProviderLoadingCache().get(topicName);
+            } catch (ExecutionException e) {
+                log.error("Failed to load schema info provider for topic {}", topicName, e);
+                throw e.getCause();
+            }
+
+            if (schema instanceof AutoConsumeSchema) {
+                SchemaInfo schemaInfo = schemaInfoProvider.getLatestSchema();
+                if (schemaInfo.getType() != SchemaType.AVRO){
+                    throw new RuntimeException("Currently schema detection only works for topics with avro schemas");
+
+                }
+                GenericSchema genericSchema = GenericSchemaImpl.of(schemaInfoProvider.getLatestSchema());
+                log.info("Auto detected schema for topic {} : {}",
+                        topicName, new String(schemaInfo.getSchema(), UTF_8));
+                ((AutoConsumeSchema) schema).setSchema(genericSchema);
+            }
+            schema.setSchemaInfoProvider(schemaInfoProvider);
+        }
+
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 6df185b..050716a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkState;
 
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
 import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
@@ -49,6 +50,11 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
     }
 
     @Override
+    public boolean supportSchemaVersioning() {
+        return true;
+    }
+
+    @Override
     public byte[] encode(GenericRecord message) {
         ensureSchemaInitialized();
 
@@ -63,6 +69,11 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
     }
 
     @Override
+    public void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
+        schema.setSchemaInfoProvider(schemaInfoProvider);
+    }
+
+    @Override
     public SchemaInfo getSchemaInfo() {
         ensureSchemaInitialized();
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index a2ce0d1..efe08d2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -21,20 +21,16 @@ package org.apache.pulsar.client.impl.schema;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Conversions;
 import org.apache.avro.data.TimeConversions;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.api.schema.SchemaReader;
+import org.apache.pulsar.client.impl.schema.reader.AvroReader;
+import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.util.Map;
 
 /**
@@ -42,15 +38,9 @@ import java.util.Map;
  */
 @Slf4j
 public class AvroSchema<T> extends StructSchema<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(AvroSchema.class);
 
-    private ReflectDatumWriter<T> datumWriter;
-    private ReflectDatumReader<T> reader;
-    private BinaryEncoder encoder;
-    private ByteArrayOutputStream byteArrayOutputStream;
-
-    private static final ThreadLocal<BinaryDecoder> decoders =
-            new ThreadLocal<>();
-//      the aim to fix avro's bug
+    //      the aim to fix avro's bug
 //      https://issues.apache.org/jira/browse/AVRO-1891  bug address explain
 //      fix the avro logical type read and write
     static {
@@ -77,53 +67,19 @@ public class AvroSchema<T> extends StructSchema<T> {
         reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeConversion());
     }
 
-    private AvroSchema(org.apache.avro.Schema schema,
-                       SchemaDefinition schemaDefinition) {
-        super(
-            SchemaType.AVRO,
-            schema,
-            schemaDefinition.getProperties());
-        this.byteArrayOutputStream = new ByteArrayOutputStream();
-        this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, this.encoder);
-        this.datumWriter = new ReflectDatumWriter<>(this.schema);
-        this.reader = new ReflectDatumReader<>(this.schema);
-    }
-
-    @Override
-    public synchronized byte[] encode(T message) {
-        try {
-            datumWriter.write(message, this.encoder);
-            this.encoder.flush();
-            return this.byteArrayOutputStream.toByteArray();
-        } catch (Exception e) {
-            throw new SchemaSerializationException(e);
-        } finally {
-            this.byteArrayOutputStream.reset();
-        }
-    }
-
-    @Override
-    public T decode(byte[] bytes) {
-        try {
-            BinaryDecoder decoderFromCache = decoders.get();
-            BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, decoderFromCache);
-            if (decoderFromCache == null) {
-                decoders.set(decoder);
-            }
-            return reader.read(null, DecoderFactory.get().binaryDecoder(bytes, decoder));
-        } catch (IOException e) {
-            throw new SchemaSerializationException(e);
-        }
+    private AvroSchema(SchemaInfo schemaInfo) {
+        super(schemaInfo);
+        setReader(new AvroReader<>(schema));
+        setWriter(new AvroWriter<>(schema));
     }
 
     @Override
-    public SchemaInfo getSchemaInfo() {
-        return this.schemaInfo;
+    public boolean supportSchemaVersioning() {
+        return true;
     }
 
     public static <T> AvroSchema<T> of(SchemaDefinition<T> schemaDefinition) {
-        return schemaDefinition.getJsonDef() == null ?
-                new AvroSchema<>(createAvroSchema(schemaDefinition), schemaDefinition) : new AvroSchema<>(parseAvroSchema(schemaDefinition.getJsonDef()), schemaDefinition);
+        return new AvroSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.AVRO));
     }
 
     public static <T> AvroSchema<T> of(Class<T> pojo) {
@@ -132,7 +88,18 @@ public class AvroSchema<T> extends StructSchema<T> {
 
     public static <T> AvroSchema<T> of(Class<T> pojo, Map<String, String> properties) {
         SchemaDefinition<T> schemaDefinition = SchemaDefinition.<T>builder().withPojo(pojo).withProperties(properties).build();
-        return new AvroSchema<>(createAvroSchema(schemaDefinition), schemaDefinition);
+        return new AvroSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.AVRO));
     }
 
+    @Override
+    protected SchemaReader<T> loadReader(byte[] schemaVersion) {
+        SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
+        if (schemaInfo != null) {
+            return new AvroReader<>(parseAvroSchema(new String(schemaInfo.getSchema())), schema);
+        } else {
+            return reader;
+        }
+    }
+
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
index 629b769..4646465 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
@@ -25,12 +25,14 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
 import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.api.schema.SchemaReader;
+import org.apache.pulsar.client.api.schema.SchemaWriter;
+import org.apache.pulsar.client.impl.schema.reader.JsonReader;
+import org.apache.pulsar.client.impl.schema.writer.JsonWriter;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
-import java.io.IOException;
 import java.util.Map;
 
 /**
@@ -48,39 +50,17 @@ public class JSONSchema<T> extends StructSchema<T> {
     });
 
     private final Class<T> pojo;
-    private final ObjectMapper objectMapper;
 
-    private JSONSchema(org.apache.avro.Schema schema,
-                        SchemaDefinition<T> schemaDefinition) {
-        super(
-            SchemaType.JSON,
-            schema,
-            schemaDefinition.getProperties());
-        this.pojo = schemaDefinition.getPojo();
-        this.objectMapper = JSON_MAPPER.get();
+    private JSONSchema(SchemaInfo schemaInfo, Class<T> pojo) {
+        super(schemaInfo);
+        this.pojo = pojo;
+        setWriter(new JsonWriter<>(JSON_MAPPER.get()));
+        setReader(new JsonReader<>(JSON_MAPPER.get(), pojo));
     }
 
     @Override
-    public byte[] encode(T message) throws SchemaSerializationException {
-        try {
-            return objectMapper.writeValueAsBytes(message);
-        } catch (JsonProcessingException e) {
-            throw new SchemaSerializationException(e);
-        }
-    }
-
-    @Override
-    public T decode(byte[] bytes) {
-        try {
-            return objectMapper.readValue(bytes, this.pojo);
-        } catch (IOException e) {
-            throw new SchemaSerializationException(e);
-        }
-    }
-
-    @Override
-    public SchemaInfo getSchemaInfo() {
-        return this.schemaInfo;
+    protected SchemaReader<T> loadReader(byte[] schemaVersion) {
+        throw new RuntimeException("JSONSchema don't support schema versioning");
     }
 
     /**
@@ -108,9 +88,7 @@ public class JSONSchema<T> extends StructSchema<T> {
     }
 
     public static <T> JSONSchema<T> of(SchemaDefinition<T> schemaDefinition) {
-        String jsonDef = schemaDefinition.getJsonDef();
-            return jsonDef == null ? new JSONSchema<>(createAvroSchema(schemaDefinition), schemaDefinition) :
-                    new JSONSchema<>(parseAvroSchema(jsonDef), schemaDefinition);
+        return new JSONSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.JSON), schemaDefinition.getPojo());
     }
 
     public static <T> JSONSchema<T> of(Class<T> pojo) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
index de61dca..9328908 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
@@ -21,28 +21,32 @@ package org.apache.pulsar.client.impl.schema;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.protobuf.Descriptors;
-import com.google.protobuf.Parser;
+import com.google.protobuf.GeneratedMessageV3;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
-import org.apache.avro.protobuf.ProtobufDatumReader;
-import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.avro.protobuf.ProtobufData;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.api.schema.SchemaReader;
+import org.apache.pulsar.client.api.schema.SchemaWriter;
+import org.apache.pulsar.client.impl.schema.reader.ProtobufReader;
+import org.apache.pulsar.client.impl.schema.writer.ProtobufWriter;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
 import java.lang.reflect.InvocationTargetException;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Consumer;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 /**
  * A schema implementation to deal with protobuf generated messages.
  */
 public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> extends StructSchema<T> {
 
-    private Parser<T> tParser;
     public static final String PARSING_INFO_PROPERTY = "__PARSING_INFO__";
 
     @Getter
@@ -57,29 +61,19 @@ public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> ex
     }
 
     private static <T> org.apache.avro.Schema createProtobufAvroSchema(Class<T> pojo) {
-        ProtobufDatumReader datumReader = new ProtobufDatumReader(pojo);
-        return datumReader.getSchema();
+        return ProtobufData.get().getSchema(pojo);
     }
 
-    private ProtobufSchema(Map<String, String> properties, Class<T> pojo) {
-        super(
-            SchemaType.PROTOBUF,
-            createProtobufAvroSchema(pojo),
-            properties);
+    private ProtobufSchema(SchemaInfo schemaInfo, T protoMessageInstance) {
+        super(schemaInfo);
+        setReader(new ProtobufReader<>(protoMessageInstance));
+        setWriter(new ProtobufWriter<>());
         // update properties with protobuf related properties
-        try {
-            T protoMessageInstance = (T) pojo.getMethod("getDefaultInstance").invoke(null);
-            tParser = (Parser<T>) protoMessageInstance.getParserForType();
-
-            Map<String, String> allProperties = new HashMap<>();
-            allProperties.putAll(schemaInfo.getProperties());
-            // set protobuf parsing info
-            allProperties.put(PARSING_INFO_PROPERTY, getParsingInfo(protoMessageInstance));
-
-            schemaInfo.setProperties(allProperties);
-        } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
-            throw new IllegalArgumentException(e);
-        }
+        Map<String, String> allProperties = new HashMap<>();
+        allProperties.putAll(schemaInfo.getProperties());
+        // set protobuf parsing info
+        allProperties.put(PARSING_INFO_PROPERTY, getParsingInfo(protoMessageInstance));
+        schemaInfo.setProperties(allProperties);
     }
 
     private String getParsingInfo(T protoMessageInstance) {
@@ -101,38 +95,43 @@ public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> ex
     }
 
     @Override
-    public byte[] encode(T message) {
-        return message.toByteArray();
-    }
+    protected SchemaReader<T> loadReader(byte[] schemaVersion) {
+        throw new RuntimeException("ProtobufSchema don't support schema versioning");    }
 
-    @Override
-    public T decode(byte[] bytes) {
-        try {
-            return this.tParser.parseFrom(bytes);
-        } catch (Exception e) {
-            throw new RuntimeException(new SchemaSerializationException(e));
-        }
+    public static <T extends com.google.protobuf.GeneratedMessageV3> ProtobufSchema<T> of(Class<T> pojo) {
+        return of(pojo, new HashMap<>());
     }
 
-    @Override
-    public SchemaInfo getSchemaInfo() {
-        return schemaInfo;
+    public static <T> ProtobufSchema ofGenericClass(Class<T> pojo, Map<String, String> properties) {
+        SchemaDefinition<T> schemaDefinition = SchemaDefinition.<T>builder().withPojo(pojo).withProperties(properties).build();
+        return ProtobufSchema.of(schemaDefinition);
     }
 
-    public static <T extends com.google.protobuf.GeneratedMessageV3> ProtobufSchema<T> of(Class<T> pojo) {
-        return of(pojo, Collections.emptyMap());
-    }
+    public static <T> ProtobufSchema of(SchemaDefinition<T> schemaDefinition) {
+        Class<T> pojo = schemaDefinition.getPojo();
 
-    public static ProtobufSchema ofGenericClass(Class pojo, Map<String, String> properties) {
         if (!com.google.protobuf.GeneratedMessageV3.class.isAssignableFrom(pojo)) {
             throw new IllegalArgumentException(com.google.protobuf.GeneratedMessageV3.class.getName()
                     + " is not assignable from " + pojo.getName());
         }
-        return new ProtobufSchema<>(properties, pojo);
+
+            SchemaInfo schemaInfo = SchemaInfo.builder()
+                    .schema(createProtobufAvroSchema(schemaDefinition.getPojo()).toString().getBytes(UTF_8))
+                    .type(SchemaType.PROTOBUF)
+                    .name("")
+                    .properties(schemaDefinition.getProperties())
+                    .build();
+
+        try {
+            return new ProtobufSchema(schemaInfo,
+                (GeneratedMessageV3) pojo.getMethod("getDefaultInstance").invoke(null));
+        }catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
+            throw new IllegalArgumentException(e);
+        }
     }
 
     public static <T extends com.google.protobuf.GeneratedMessageV3> ProtobufSchema<T> of(
-            Class<T> pojo, Map<String, String> properties){
+            Class pojo, Map<String, String> properties){
         return ofGenericClass(pojo, properties);
     }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java
index 2db6cf4..ea9ede5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java
@@ -55,6 +55,11 @@ public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T
      */
     private String jsonDef;
 
+    /**
+     * The flag of message decode whether by schema version
+     */
+    private boolean supportSchemaVersioning = false;
+
     @Override
     public SchemaDefinitionBuilder<T> withAlwaysAllowNull(boolean alwaysAllowNull) {
         this.alwaysAllowNull = alwaysAllowNull;
@@ -79,6 +84,11 @@ public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T
         return this;
     }
 
+    @Override
+    public SchemaDefinitionBuilder<T> withSupportSchemaVersioning(boolean supportSchemaVersioning) {
+        this.supportSchemaVersioning = supportSchemaVersioning;
+        return this;
+    }
 
     @Override
     public SchemaDefinitionBuilder<T> withProperties(Map<String,String> properties) {
@@ -89,7 +99,7 @@ public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T
     @Override
     public  SchemaDefinition<T> build() {
         properties.put(ALWAYS_ALLOW_NULL, this.alwaysAllowNull ? "true" : "false");
-        return new SchemaDefinitionImpl(clazz, jsonDef, alwaysAllowNull, properties);
+        return new SchemaDefinitionImpl(clazz, jsonDef, alwaysAllowNull, properties, supportSchemaVersioning);
 
     }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java
index 04f1a24..8ace246 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.client.impl.schema;
 
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.Map;
 
 /**
@@ -48,11 +48,14 @@ public class SchemaDefinitionImpl<T> implements SchemaDefinition<T>{
 
     private String jsonDef;
 
-    public SchemaDefinitionImpl(Class<T> pojo, String jsonDef, boolean alwaysAllowNull, Map<String,String> properties) {
+    private boolean supportSchemaVersioning;
+
+    public SchemaDefinitionImpl(Class<T> pojo, String jsonDef, boolean alwaysAllowNull, Map<String,String> properties, boolean supportSchemaVersioning) {
         this.alwaysAllowNull = alwaysAllowNull;
         this.properties = properties;
         this.jsonDef = jsonDef;
         this.pojo = pojo;
+        this.supportSchemaVersioning = supportSchemaVersioning;
     }
     /**
      * get schema whether always allow null or not
@@ -60,7 +63,6 @@ public class SchemaDefinitionImpl<T> implements SchemaDefinition<T>{
      * @return schema always null or not
      */
     public boolean getAlwaysAllowNull() {
-
         return alwaysAllowNull;
     }
 
@@ -70,7 +72,6 @@ public class SchemaDefinitionImpl<T> implements SchemaDefinition<T>{
      * @return schema class
      */
     public String getJsonDef() {
-
         return jsonDef;
     }
     /**
@@ -83,16 +84,18 @@ public class SchemaDefinitionImpl<T> implements SchemaDefinition<T>{
         return pojo;
     }
 
+    @Override
+    public boolean getSupportSchemaVersioning() {
+        return supportSchemaVersioning;
+    }
+
     /**
      * Get schema class
      *
      * @return schema class
      */
     public Map<String, String> getProperties() {
-
-        return properties;
+        return Collections.unmodifiableMap(properties);
     }
 
-
-
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
index 31156d4..89427d4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
@@ -20,13 +20,25 @@ package org.apache.pulsar.client.impl.schema;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
-import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import org.apache.avro.Schema.Parser;
 import org.apache.avro.reflect.ReflectData;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
+import org.apache.pulsar.client.api.schema.SchemaReader;
+import org.apache.pulsar.client.api.schema.SchemaWriter;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This is a base schema implementation for `Struct` types.
@@ -38,39 +50,99 @@ import org.apache.pulsar.common.schema.SchemaType;
  * {@link org.apache.pulsar.common.schema.SchemaType#JSON},
  * and {@link org.apache.pulsar.common.schema.SchemaType#PROTOBUF}.
  */
-abstract class StructSchema<T> implements Schema<T> {
+public abstract class StructSchema<T> implements Schema<T> {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(StructSchema.class);
 
     protected final org.apache.avro.Schema schema;
     protected final SchemaInfo schemaInfo;
+    protected SchemaReader<T> reader;
+    protected SchemaWriter<T> writer;
+    protected SchemaInfoProvider schemaInfoProvider;
+    private final LoadingCache<byte[], SchemaReader<T>> readerCache = CacheBuilder.newBuilder().maximumSize(100000)
+            .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<byte[], SchemaReader<T>>() {
+                @Override
+                public SchemaReader<T> load(byte[] schemaVersion) {
+                    return loadReader(schemaVersion);
+                }
+            });
 
-    protected StructSchema(SchemaType schemaType,
-                           org.apache.avro.Schema schema,
-                           Map<String, String> properties) {
-        this.schema = schema;
-        this.schemaInfo = new SchemaInfo();
-        this.schemaInfo.setName("");
-        this.schemaInfo.setType(schemaType);
-        this.schemaInfo.setSchema(this.schema.toString().getBytes(UTF_8));
-        this.schemaInfo.setProperties(properties);
+    protected StructSchema(SchemaInfo schemaInfo) {
+        this.schema = parseAvroSchema(new String(schemaInfo.getSchema(), UTF_8));
+        this.schemaInfo = schemaInfo;
     }
 
-    protected org.apache.avro.Schema getAvroSchema() {
+    public org.apache.avro.Schema getAvroSchema() {
         return schema;
     }
 
     @Override
+    public byte[] encode(T message) {
+        return writer.write(message);
+    }
+
+    @Override
+    public T decode(byte[] bytes) {
+        return reader.read(bytes);
+    }
+
+    @Override
+    public T decode(byte[] bytes, byte[] schemaVersion) {
+        try {
+            return readerCache.get(schemaVersion).read(bytes);
+        } catch (ExecutionException e) {
+            LOG.error("Can't get generic schema for topic {} schema version {}",
+                    schemaInfoProvider.getTopicName(), Hex.encodeHexString(schemaVersion), e);
+            throw new RuntimeException("Can't get generic schema for topic " + schemaInfoProvider.getTopicName());
+        }
+    }
+
+    @Override
     public SchemaInfo getSchemaInfo() {
         return this.schemaInfo;
     }
 
     protected static org.apache.avro.Schema createAvroSchema(SchemaDefinition schemaDefinition) {
         Class pojo = schemaDefinition.getPojo();
-        return schemaDefinition.getAlwaysAllowNull() ? ReflectData.AllowNull.get().getSchema(pojo) : ReflectData.get().getSchema(pojo);
+
+        if (StringUtils.isNotBlank(schemaDefinition.getJsonDef())) {
+            return parseAvroSchema(schemaDefinition.getJsonDef());
+        } else if (pojo != null) {
+            return schemaDefinition.getAlwaysAllowNull() ? ReflectData.AllowNull.get().getSchema(pojo) : ReflectData.get().getSchema(pojo);
+        } else {
+            throw new RuntimeException("Schema definition must specify pojo class or schema json definition");
+        }
+    }
+
+    protected static org.apache.avro.Schema parseAvroSchema(String schemaJson) {
+        final Parser parser = new Parser();
+        return parser.parse(schemaJson);
+    }
+
+    protected static <T> SchemaInfo parseSchemaInfo(SchemaDefinition<T> schemaDefinition, SchemaType schemaType) {
+        return SchemaInfo.builder()
+                .schema(createAvroSchema(schemaDefinition).toString().getBytes(UTF_8))
+                .properties(schemaDefinition.getProperties())
+                .name("")
+                .type(schemaType).build();
+    }
+
+    public void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
+        this.schemaInfoProvider = schemaInfoProvider;
+    }
+
+    protected abstract SchemaReader<T> loadReader(byte[] schemaVersion);
+
+    protected void setWriter(SchemaWriter<T> writer) {
+        this.writer = writer;
+    }
+
+    protected void setReader(SchemaReader<T> reader) {
+        this.reader = reader;
     }
 
-    protected static org.apache.avro.Schema parseAvroSchema(String jsonDef) {
-        Parser parser = new Parser();
-        return parser.parse(jsonDef);
+    protected SchemaReader<T> getReader() {
+        return  reader;
     }
 
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java
similarity index 54%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java
index 7cca2ac..bc4f65e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java
@@ -18,69 +18,62 @@
  */
 package org.apache.pulsar.client.impl.schema.generic;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
+import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.Field;
 import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
-import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.client.api.schema.SchemaReader;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
 
-/**
- * A generic avro schema.
- */
-class GenericAvroSchema extends GenericSchemaImpl {
 
-    private final GenericDatumWriter<org.apache.avro.generic.GenericRecord> datumWriter;
+public class GenericAvroReader implements SchemaReader<GenericRecord> {
+
+    private final GenericDatumReader<GenericAvroRecord> reader;
     private BinaryEncoder encoder;
     private final ByteArrayOutputStream byteArrayOutputStream;
-    private final GenericDatumReader<org.apache.avro.generic.GenericRecord> datumReader;
-
-    public GenericAvroSchema(SchemaInfo schemaInfo) {
-        super(schemaInfo);
-        this.byteArrayOutputStream = new ByteArrayOutputStream();
-        this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, encoder);
-        this.datumWriter = new GenericDatumWriter(schema);
-        this.datumReader = new GenericDatumReader(schema);
+    private final List<Field> fields;
+    private final Schema schema;
+    private final byte[] schemaVersion;
+    public GenericAvroReader(Schema schema) {
+        this(null, schema, null);
     }
 
-    @Override
-    public synchronized byte[] encode(GenericRecord message) {
-        checkArgument(message instanceof GenericAvroRecord);
-        GenericAvroRecord gar = (GenericAvroRecord) message;
-        try {
-            datumWriter.write(gar.getAvroRecord(), this.encoder);
-            this.encoder.flush();
-            return this.byteArrayOutputStream.toByteArray();
-        } catch (Exception e) {
-            throw new SchemaSerializationException(e);
-        } finally {
-            this.byteArrayOutputStream.reset();
+    public GenericAvroReader(Schema writerSchema, Schema readerSchema, byte[] schemaVersion) {
+        this.schema = readerSchema;
+        this.fields = schema.getFields()
+                .stream()
+                .map(f -> new Field(f.name(), f.pos()))
+                .collect(Collectors.toList());
+        this.schemaVersion = schemaVersion;
+        if (writerSchema == null) {
+            this.reader = new GenericDatumReader<>(readerSchema);
+        } else {
+            this.reader = new GenericDatumReader<>(writerSchema, readerSchema);
         }
+        this.byteArrayOutputStream = new ByteArrayOutputStream();
+        this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, encoder);
     }
 
     @Override
-    public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
+    public GenericAvroRecord read(byte[] bytes) {
         try {
             Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
-            org.apache.avro.generic.GenericRecord avroRecord = datumReader.read(
-                null,
-                decoder);
+            org.apache.avro.generic.GenericRecord avroRecord =
+                    (org.apache.avro.generic.GenericRecord)reader.read(
+                    null,
+                    decoder);
             return new GenericAvroRecord(schemaVersion, schema, fields, avroRecord);
         } catch (IOException e) {
             throw new SchemaSerializationException(e);
         }
     }
-
-    @Override
-    public GenericRecordBuilder newRecordBuilder() {
-        return new AvroRecordBuilderImpl(this);
-    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
index 7cca2ac..42344d0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
@@ -18,69 +18,39 @@
  */
 package org.apache.pulsar.client.impl.schema.generic;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.Decoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
+import org.apache.pulsar.client.api.schema.SchemaReader;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
 /**
  * A generic avro schema.
  */
-class GenericAvroSchema extends GenericSchemaImpl {
-
-    private final GenericDatumWriter<org.apache.avro.generic.GenericRecord> datumWriter;
-    private BinaryEncoder encoder;
-    private final ByteArrayOutputStream byteArrayOutputStream;
-    private final GenericDatumReader<org.apache.avro.generic.GenericRecord> datumReader;
+public class GenericAvroSchema extends GenericSchemaImpl {
 
     public GenericAvroSchema(SchemaInfo schemaInfo) {
         super(schemaInfo);
-        this.byteArrayOutputStream = new ByteArrayOutputStream();
-        this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, encoder);
-        this.datumWriter = new GenericDatumWriter(schema);
-        this.datumReader = new GenericDatumReader(schema);
+        setReader(new GenericAvroReader(schema));
+        setWriter(new GenericAvroWriter(schema));
     }
 
     @Override
-    public synchronized byte[] encode(GenericRecord message) {
-        checkArgument(message instanceof GenericAvroRecord);
-        GenericAvroRecord gar = (GenericAvroRecord) message;
-        try {
-            datumWriter.write(gar.getAvroRecord(), this.encoder);
-            this.encoder.flush();
-            return this.byteArrayOutputStream.toByteArray();
-        } catch (Exception e) {
-            throw new SchemaSerializationException(e);
-        } finally {
-            this.byteArrayOutputStream.reset();
-        }
+    public GenericRecordBuilder newRecordBuilder() {
+        return new AvroRecordBuilderImpl(this);
     }
 
-    @Override
-    public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
-        try {
-            Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
-            org.apache.avro.generic.GenericRecord avroRecord = datumReader.read(
-                null,
-                decoder);
-            return new GenericAvroRecord(schemaVersion, schema, fields, avroRecord);
-        } catch (IOException e) {
-            throw new SchemaSerializationException(e);
-        }
-    }
 
     @Override
-    public GenericRecordBuilder newRecordBuilder() {
-        return new AvroRecordBuilderImpl(this);
+    protected SchemaReader<GenericRecord> loadReader(byte[] schemaVersion) {
+         SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
+         if (schemaInfo != null) {
+             return new GenericAvroReader(
+                     parseAvroSchema(new String(schemaInfo.getSchema())),
+                     schema,
+                     schemaVersion);
+         } else {
+             return reader;
+         }
     }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroWriter.java
similarity index 52%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroWriter.java
index 7cca2ac..28ad116 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroWriter.java
@@ -18,45 +18,32 @@
  */
 package org.apache.pulsar.client.impl.schema.generic;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.Decoder;
-import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
-import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.client.api.schema.SchemaWriter;
 
-/**
- * A generic avro schema.
- */
-class GenericAvroSchema extends GenericSchemaImpl {
+import java.io.ByteArrayOutputStream;
+
+public class GenericAvroWriter implements SchemaWriter<GenericRecord> {
 
-    private final GenericDatumWriter<org.apache.avro.generic.GenericRecord> datumWriter;
+    private final GenericDatumWriter<org.apache.avro.generic.GenericRecord> writer;
     private BinaryEncoder encoder;
     private final ByteArrayOutputStream byteArrayOutputStream;
-    private final GenericDatumReader<org.apache.avro.generic.GenericRecord> datumReader;
 
-    public GenericAvroSchema(SchemaInfo schemaInfo) {
-        super(schemaInfo);
+    public GenericAvroWriter(Schema schema) {
+        this.writer = new GenericDatumWriter<>(schema);
         this.byteArrayOutputStream = new ByteArrayOutputStream();
         this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, encoder);
-        this.datumWriter = new GenericDatumWriter(schema);
-        this.datumReader = new GenericDatumReader(schema);
     }
 
     @Override
-    public synchronized byte[] encode(GenericRecord message) {
-        checkArgument(message instanceof GenericAvroRecord);
-        GenericAvroRecord gar = (GenericAvroRecord) message;
+    public synchronized byte[] write(GenericRecord message) {
         try {
-            datumWriter.write(gar.getAvroRecord(), this.encoder);
+            writer.write(((GenericAvroRecord)message).getAvroRecord(), this.encoder);
             this.encoder.flush();
             return this.byteArrayOutputStream.toByteArray();
         } catch (Exception e) {
@@ -65,22 +52,4 @@ class GenericAvroSchema extends GenericSchemaImpl {
             this.byteArrayOutputStream.reset();
         }
     }
-
-    @Override
-    public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
-        try {
-            Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
-            org.apache.avro.generic.GenericRecord avroRecord = datumReader.read(
-                null,
-                decoder);
-            return new GenericAvroRecord(schemaVersion, schema, fields, avroRecord);
-        } catch (IOException e) {
-            throw new SchemaSerializationException(e);
-        }
-    }
-
-    @Override
-    public GenericRecordBuilder newRecordBuilder() {
-        return new AvroRecordBuilderImpl(this);
-    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java
similarity index 63%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java
index d15d563..1d2fced 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java
@@ -18,42 +18,37 @@
  */
 package org.apache.pulsar.client.impl.schema.generic;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.IOException;
 import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.Field;
 import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
+import org.apache.pulsar.client.api.schema.SchemaReader;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
-/**
- * A generic json schema.
- */
-class GenericJsonSchema extends GenericSchemaImpl {
+import java.io.IOException;
+import java.util.List;
 
-    private final ObjectMapper objectMapper;
+import static java.nio.charset.StandardCharsets.UTF_8;
 
-    public GenericJsonSchema(SchemaInfo schemaInfo) {
-        super(schemaInfo);
+public class GenericJsonReader implements SchemaReader<GenericRecord> {
+
+    private final ObjectMapper objectMapper;
+    private final byte[] schemaVersion;
+    private final List<Field> fields;
+    public GenericJsonReader(List<Field> fields){
+        this.fields = fields;
+        this.schemaVersion = null;
         this.objectMapper = new ObjectMapper();
     }
 
-    @Override
-    public byte[] encode(GenericRecord message) {
-        checkArgument(message instanceof GenericAvroRecord);
-        GenericJsonRecord gjr = (GenericJsonRecord) message;
-        try {
-            return objectMapper.writeValueAsBytes(gjr.getJsonNode().toString());
-        } catch (IOException ioe) {
-            throw new SchemaSerializationException(ioe);
-        }
+    public GenericJsonReader(byte[] schemaVersion, List<Field> fields){
+        this.objectMapper = new ObjectMapper();
+        this.fields = fields;
+        this.schemaVersion = schemaVersion;
     }
-
     @Override
-    public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
+    public GenericJsonRecord read(byte[] bytes) {
         try {
             JsonNode jn = objectMapper.readTree(new String(bytes, UTF_8));
             return new GenericJsonRecord(schemaVersion, fields, jn);
@@ -61,9 +56,4 @@ class GenericJsonSchema extends GenericSchemaImpl {
             throw new SchemaSerializationException(ioe);
         }
     }
-
-    @Override
-    public GenericRecordBuilder newRecordBuilder() {
-        throw new UnsupportedOperationException("Json Schema doesn't support record builder yet");
-    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
index d15d563..14d939f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
@@ -18,47 +18,38 @@
  */
 package org.apache.pulsar.client.impl.schema.generic;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.nio.charset.StandardCharsets.UTF_8;
+import java.util.stream.Collectors;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.IOException;
-import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.Field;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
+import org.apache.pulsar.client.api.schema.SchemaReader;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 /**
  * A generic json schema.
  */
 class GenericJsonSchema extends GenericSchemaImpl {
 
-    private final ObjectMapper objectMapper;
-
     public GenericJsonSchema(SchemaInfo schemaInfo) {
         super(schemaInfo);
-        this.objectMapper = new ObjectMapper();
-    }
-
-    @Override
-    public byte[] encode(GenericRecord message) {
-        checkArgument(message instanceof GenericAvroRecord);
-        GenericJsonRecord gjr = (GenericJsonRecord) message;
-        try {
-            return objectMapper.writeValueAsBytes(gjr.getJsonNode().toString());
-        } catch (IOException ioe) {
-            throw new SchemaSerializationException(ioe);
-        }
+        setWriter(new GenericJsonWriter());
+        setReader(new GenericJsonReader(fields));
     }
 
     @Override
-    public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
-        try {
-            JsonNode jn = objectMapper.readTree(new String(bytes, UTF_8));
-            return new GenericJsonRecord(schemaVersion, fields, jn);
-        } catch (IOException ioe) {
-            throw new SchemaSerializationException(ioe);
+    protected SchemaReader<GenericRecord> loadReader(byte[] schemaVersion) {
+        SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
+        if (schemaInfo != null) {
+            return new GenericJsonReader(schemaVersion,
+                    (parseAvroSchema(new String(schemaInfo.getSchema(), UTF_8)).getFields()
+                            .stream()
+                            .map(f -> new Field(f.name(), f.pos()))
+                            .collect(Collectors.toList())));
+        } else {
+            return reader;
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/SchemaProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonWriter.java
similarity index 53%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/SchemaProvider.java
copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonWriter.java
index 18778e1..52b23e8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/SchemaProvider.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonWriter.java
@@ -18,19 +18,27 @@
  */
 package org.apache.pulsar.client.impl.schema.generic;
 
-import org.apache.pulsar.client.api.Schema;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.SchemaWriter;
 
-/**
- * Schema Provider.
- */
-public interface SchemaProvider<T> {
+import java.io.IOException;
+
+public class GenericJsonWriter implements SchemaWriter<GenericRecord> {
+
+    private final ObjectMapper objectMapper;
 
-    /**
-     * Retrieve the schema instance of a given <tt>schemaVersion</tt>.
-     *
-     * @param schemaVersion schema version
-     * @return schema instance of the provided <tt>schemaVersion</tt>
-     */
-    Schema<T> getSchema(byte[] schemaVersion);
+    public GenericJsonWriter() {
+        this.objectMapper = new ObjectMapper();
+    }
 
+    @Override
+    public byte[] write(GenericRecord message) {
+        try {
+            return objectMapper.writeValueAsBytes(((GenericJsonRecord)message).getJsonNode().toString());
+        } catch (IOException ioe) {
+            throw new SchemaSerializationException(ioe);
+        }
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
index 69b3e15..a8aa283 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
@@ -20,36 +20,27 @@ package org.apache.pulsar.client.impl.schema.generic;
 
 import java.util.List;
 import java.util.stream.Collectors;
-import org.apache.pulsar.client.api.Schema;
+
 import org.apache.pulsar.client.api.schema.Field;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.impl.schema.StructSchema;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
 /**
  * A generic schema representation.
  */
-public abstract class GenericSchemaImpl implements GenericSchema {
+public abstract class GenericSchemaImpl extends StructSchema<GenericRecord> implements GenericSchema<GenericRecord> {
 
-    protected final org.apache.avro.Schema schema;
     protected final List<Field> fields;
-    protected final SchemaInfo schemaInfo;
 
     protected GenericSchemaImpl(SchemaInfo schemaInfo) {
-        this.schemaInfo = schemaInfo;
-        this.schema = new org.apache.avro.Schema.Parser().parse(
-            new String(schemaInfo.getSchema(), UTF_8)
-        );
-        this.fields = schema.getFields()
-            .stream()
-            .map(f -> new Field(f.name(), f.pos()))
-            .collect(Collectors.toList());
-    }
+        super(schemaInfo);
 
-    public org.apache.avro.Schema getAvroSchema() {
-        return schema;
+        this.fields = schema.getFields()
+                .stream()
+                .map(f -> new Field(f.name(), f.pos()))
+                .collect(Collectors.toList());
     }
 
     @Override
@@ -57,11 +48,6 @@ public abstract class GenericSchemaImpl implements GenericSchema {
         return fields;
     }
 
-    @Override
-    public SchemaInfo getSchemaInfo() {
-        return schemaInfo;
-    }
-
     /**
      * Create a generic schema out of a <tt>SchemaInfo</tt>.
      *
@@ -75,7 +61,7 @@ public abstract class GenericSchemaImpl implements GenericSchema {
             case JSON:
                 return new GenericJsonSchema(schemaInfo);
             default:
-                throw new UnsupportedOperationException("Generic schema is not supported on schema type '"
+                throw new UnsupportedOperationException("Generic schema is not supported on schema type "
                     + schemaInfo.getType() + "'");
         }
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchema.java
deleted file mode 100644
index 457c281..0000000
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchema.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl.schema.generic;
-
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.impl.schema.BytesSchema;
-import org.apache.pulsar.common.schema.SchemaInfo;
-
-/**
- * A schema implementation that handles schema versioning.
- */
-public class MultiVersionGenericSchema implements Schema<GenericRecord> {
-
-    private final SchemaProvider<GenericRecord> provider;
-
-    MultiVersionGenericSchema(SchemaProvider<GenericRecord> provider) {
-        this.provider = provider;
-    }
-
-    @Override
-    public byte[] encode(GenericRecord message) {
-        throw new UnsupportedOperationException("This schema implementation is only used for AUTO_CONSUME");
-    }
-
-    @Override
-    public boolean supportSchemaVersioning() {
-        return true;
-    }
-
-    @Override
-    public GenericRecord decode(byte[] bytes) {
-        return provider.getSchema(null).decode(bytes);
-    }
-
-    @Override
-    public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
-        return provider.getSchema(schemaVersion).decode(bytes, schemaVersion);
-    }
-
-    @Override
-    public SchemaInfo getSchemaInfo() {
-        // simulate it is a bytes schema
-        return BytesSchema.of().getSchemaInfo();
-    }
-}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java
similarity index 61%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProvider.java
rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java
index fbf6c19..12e8956 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProvider.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java
@@ -21,8 +21,7 @@ package org.apache.pulsar.client.impl.schema.generic;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -37,28 +36,28 @@ import java.util.concurrent.TimeUnit;
 /**
  * Multi version generic schema provider by guava cache.
  */
-public class MultiVersionGenericSchemaProvider implements SchemaProvider<GenericRecord> {
+public class MultiVersionSchemaInfoProvider implements SchemaInfoProvider {
 
-    private static final Logger LOG = LoggerFactory.getLogger(MultiVersionGenericSchemaProvider.class);
+    private static final Logger LOG = LoggerFactory.getLogger(MultiVersionSchemaInfoProvider.class);
 
     private final TopicName topicName;
     private final PulsarClientImpl pulsarClient;
 
-    private final LoadingCache<byte[], GenericSchema> cache = CacheBuilder.newBuilder().maximumSize(100000)
-            .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<byte[], GenericSchema>() {
+    private final LoadingCache<byte[], SchemaInfo> cache = CacheBuilder.newBuilder().maximumSize(100000)
+            .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<byte[], SchemaInfo>() {
                 @Override
-                public GenericSchema load(byte[] schemaVersion) throws Exception {
+                public SchemaInfo load(byte[] schemaVersion) throws Exception {
                     return loadSchema(schemaVersion);
                 }
             });
 
-    public MultiVersionGenericSchemaProvider(TopicName topicName, PulsarClientImpl pulsarClient) {
+    public MultiVersionSchemaInfoProvider(TopicName topicName, PulsarClientImpl pulsarClient) {
         this.topicName = topicName;
         this.pulsarClient = pulsarClient;
     }
 
     @Override
-    public GenericSchema getSchema(byte[] schemaVersion) {
+    public SchemaInfo getSchemaByVersion(byte[] schemaVersion) {
         try {
             if (null == schemaVersion) {
                 return null;
@@ -67,18 +66,32 @@ public class MultiVersionGenericSchemaProvider implements SchemaProvider<Generic
         } catch (ExecutionException e) {
             LOG.error("Can't get generic schema for topic {} schema version {}",
                     topicName.toString(), new String(schemaVersion, StandardCharsets.UTF_8), e);
-            return null;
+            throw new RuntimeException("Can't get generic schema for topic " + topicName.toString());
         }
     }
 
-    private GenericSchema loadSchema(byte[] schemaVersion) throws ExecutionException, InterruptedException {
-        Optional<SchemaInfo> schemaInfo = pulsarClient.getLookup()
-                .getSchema(topicName, schemaVersion).get();
-        return schemaInfo.map(GenericSchemaImpl::of).orElse(null);
+    @Override
+    public SchemaInfo getLatestSchema() {
+        try {
+            Optional<SchemaInfo> optional = pulsarClient.getLookup()
+                    .getSchema(topicName).get();
+            return optional.orElse(null);
+        } catch (ExecutionException | InterruptedException e) {
+            LOG.error("Can't get current schema for topic {}",
+                    topicName.toString(), e);
+            throw new RuntimeException("Can't get current schema for topic " + topicName.toString());
+        }
     }
 
-    public TopicName getTopic() {
-        return topicName;
+    @Override
+    public String getTopicName() {
+        return topicName.getLocalName();
+    }
+
+    private SchemaInfo loadSchema(byte[] schemaVersion) throws ExecutionException, InterruptedException {
+        Optional<SchemaInfo> optional = pulsarClient.getLookup()
+                .getSchema(topicName, schemaVersion).get();
+        return optional.orElse(null);
     }
 
     public PulsarClientImpl getPulsarClient() {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java
new file mode 100644
index 0000000..c502df5
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.reader;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.SchemaReader;
+
+import java.io.IOException;
+
+public class AvroReader<T> implements SchemaReader<T> {
+
+    private ReflectDatumReader<T> reader;
+    private static final ThreadLocal<BinaryDecoder> decoders =
+            new ThreadLocal<>();
+
+    public AvroReader(Schema schema) {
+        this.reader = new ReflectDatumReader<>(schema);
+    }
+
+    public AvroReader(Schema writerSchema, Schema readerSchema) {
+        this.reader = new ReflectDatumReader<>(writerSchema, readerSchema);
+    }
+
+    @Override
+    public T read(byte[] bytes) {
+        try {
+            BinaryDecoder decoderFromCache = decoders.get();
+            BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, decoderFromCache);
+            if (decoderFromCache == null) {
+                decoders.set(decoder);
+            }
+            return reader.read(null, DecoderFactory.get().binaryDecoder(bytes, decoder));
+        } catch (IOException e) {
+            throw new SchemaSerializationException(e);
+        }
+    }
+
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/JsonReader.java
similarity index 51%
copy from pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/JsonReader.java
index abd9dea..05ef7cb 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/JsonReader.java
@@ -16,38 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.common.schema;
+package org.apache.pulsar.client.impl.schema.reader;
 
-import java.util.Collections;
-import java.util.Map;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.SchemaReader;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.NoArgsConstructor;
-import lombok.experimental.Accessors;
+import java.io.IOException;
 
-@Data
-@AllArgsConstructor
-@NoArgsConstructor
-@Accessors(chain = true)
-public class SchemaInfo {
+public class JsonReader<T> implements SchemaReader<T> {
+    private final Class<T> pojo;
+    private final ObjectMapper objectMapper;
 
-    @EqualsAndHashCode.Exclude
-    private String name;
+    public JsonReader(ObjectMapper objectMapper, Class<T> pojo) {
+        this.pojo = pojo;
+        this.objectMapper = objectMapper;
+    }
 
-    /**
-     * The schema data in AVRO JSON format
-     */
-    private byte[] schema;
-
-    /**
-     * The type of schema (AVRO, JSON, PROTOBUF, etc..)
-     */
-    private SchemaType type;
-
-    /**
-     * Additional properties of the schema definition (implementation defined)
-     */
-    private Map<String, String> properties = Collections.emptyMap();
+    @Override
+    public T read(byte[] bytes) {
+        try {
+            return objectMapper.readValue(bytes, this.pojo);
+        } catch (IOException e) {
+            throw new SchemaSerializationException(e);
+        }
+    }
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java
similarity index 51%
copy from pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java
index abd9dea..bca952e 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java
@@ -16,38 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.common.schema;
+package org.apache.pulsar.client.impl.schema.reader;
 
-import java.util.Collections;
-import java.util.Map;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Parser;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.SchemaReader;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.NoArgsConstructor;
-import lombok.experimental.Accessors;
+public class ProtobufReader<T extends com.google.protobuf.GeneratedMessageV3> implements SchemaReader<T> {
+    private Parser<T> tParser;
 
-@Data
-@AllArgsConstructor
-@NoArgsConstructor
-@Accessors(chain = true)
-public class SchemaInfo {
+    public ProtobufReader(T protoMessageInstance) {
+        tParser = (Parser<T>) (protoMessageInstance).getParserForType();
+    }
 
-    @EqualsAndHashCode.Exclude
-    private String name;
+    @Override
+    public T read(byte[] bytes) {
+        try {
+            return this.tParser.parseFrom(bytes);
+        } catch (InvalidProtocolBufferException e) {
+            throw new SchemaSerializationException(e);
+        }
+    }
 
-    /**
-     * The schema data in AVRO JSON format
-     */
-    private byte[] schema;
-
-    /**
-     * The type of schema (AVRO, JSON, PROTOBUF, etc..)
-     */
-    private SchemaType type;
-
-    /**
-     * Additional properties of the schema definition (implementation defined)
-     */
-    private Map<String, String> properties = Collections.emptyMap();
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/AvroWriter.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/AvroWriter.java
new file mode 100644
index 0000000..912bca7
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/AvroWriter.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.writer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.SchemaWriter;
+
+import java.io.ByteArrayOutputStream;
+
+public class AvroWriter<T> implements SchemaWriter<T> {
+    private final ReflectDatumWriter<T> writer;
+    private BinaryEncoder encoder;
+    private ByteArrayOutputStream byteArrayOutputStream;
+
+    public AvroWriter(Schema schema) {
+        this.byteArrayOutputStream = new ByteArrayOutputStream();
+        this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, this.encoder);
+        this.writer = new ReflectDatumWriter<>(schema);
+    }
+
+    @Override
+    public synchronized byte[] write(T message) {
+        try {
+            writer.write(message, this.encoder);
+            this.encoder.flush();
+            return this.byteArrayOutputStream.toByteArray();
+        } catch (Exception e) {
+            throw new SchemaSerializationException(e);
+        } finally {
+            this.byteArrayOutputStream.reset();
+        }
+    }
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/GenericSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/JsonWriter.java
similarity index 52%
copy from pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/GenericSchema.java
copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/JsonWriter.java
index 8b5137d..d828c3f 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/GenericSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/JsonWriter.java
@@ -16,28 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.api.schema;
+package org.apache.pulsar.client.impl.schema.writer;
 
-import java.util.List;
-import org.apache.pulsar.client.api.Schema;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.SchemaWriter;
 
-/**
- * A schema that serializes and deserializes between {@link GenericRecord} and bytes.
- */
-public interface GenericSchema extends Schema<GenericRecord> {
+public class JsonWriter<T> implements SchemaWriter<T> {
 
-    /**
-     * Returns the list of fields.
-     *
-     * @return the list of fields of generic record.
-     */
-    List<Field> getFields();
+    private final ObjectMapper objectMapper;
 
-    /**
-     * Create a builder to build {@link GenericRecord}.
-     *
-     * @return generic record builder
-     */
-    GenericRecordBuilder newRecordBuilder();
+    public JsonWriter(ObjectMapper objectMapper) {
+        this.objectMapper = objectMapper;
+    }
 
+    @Override
+    public byte[] write(T message) {
+        try {
+            return objectMapper.writeValueAsBytes(message);
+        } catch (JsonProcessingException e) {
+            throw new SchemaSerializationException(e);
+        }
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/SchemaProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufWriter.java
similarity index 66%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/SchemaProvider.java
rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufWriter.java
index 18778e1..8c9e8f8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/SchemaProvider.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufWriter.java
@@ -16,21 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.impl.schema.generic;
+package org.apache.pulsar.client.impl.schema.writer;
 
-import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaWriter;
 
-/**
- * Schema Provider.
- */
-public interface SchemaProvider<T> {
-
-    /**
-     * Retrieve the schema instance of a given <tt>schemaVersion</tt>.
-     *
-     * @param schemaVersion schema version
-     * @return schema instance of the provided <tt>schemaVersion</tt>
-     */
-    Schema<T> getSchema(byte[] schemaVersion);
+public class ProtobufWriter<T extends com.google.protobuf.GeneratedMessageV3> implements SchemaWriter<T> {
 
+    @Override
+    public byte[] write(T message) {
+        return message.toByteArray();
+    }
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java
index 5a9716e..57ccd56 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java
@@ -22,13 +22,13 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
-import org.apache.pulsar.client.impl.schema.ProtobufSchema;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.proto.Function;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import java.util.Collections;
+import java.util.HashMap;
 
 @Slf4j
 public class ProtobufSchemaTest {
@@ -46,15 +46,14 @@ public class ProtobufSchemaTest {
             "\"type\":\"double\",\"default\":0}]}],\"default\":null},{\"name\":\"repeatedField\"," +
             "\"type\":{\"type\":\"array\",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}}]}";
 
-    private static final String EXPECTED_PARSING_INFO = "{\"__PARSING_INFO__\":\"[{\\\"number\\\":1," +
-            "\\\"name\\\":\\\"stringField\\\",\\\"type\\\":\\\"STRING\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\"," +
-            "\\\"definition\\\":null},{\\\"number\\\":2,\\\"name\\\":\\\"doubleField\\\",\\\"type\\\":\\\"DOUBLE\\\"," +
-            "\\\"label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition\\\":null},{\\\"number\\\":6," +
-            "\\\"name\\\":\\\"intField\\\",\\\"type\\\":\\\"INT32\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\"," +
-            "\\\"definition\\\":null},{\\\"number\\\":4,\\\"name\\\":\\\"testEnum\\\",\\\"type\\\":\\\"ENUM\\\"," +
-            "\\\"label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition\\\":null},{\\\"number\\\":5," +
-            "\\\"name\\\":\\\"nestedField\\\",\\\"type\\\":\\\"MESSAGE\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\"," +
-            "\\\"definition\\\":null},{\\\"number\\\":10,\\\"name\\\":\\\"repeatedField\\\"," +
+    private static final String EXPECTED_PARSING_INFO = "{\"__alwaysAllowNull\":\"true\",\"__PARSING_INFO__\":" +
+            "\"[{\\\"number\\\":1,\\\"name\\\":\\\"stringField\\\",\\\"type\\\":\\\"STRING\\\",\\\"label\\\":\\\"" +
+            "LABEL_OPTIONAL\\\",\\\"definition\\\":null},{\\\"number\\\":2,\\\"name\\\":\\\"doubleField\\\",\\\"type\\\"" +
+            ":\\\"DOUBLE\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition\\\":null},{\\\"number\\\":6,\\\"name\\\"" +
+            ":\\\"intField\\\",\\\"type\\\":\\\"INT32\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition\\\":null}," +
+            "{\\\"number\\\":4,\\\"name\\\":\\\"testEnum\\\",\\\"type\\\":\\\"ENUM\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\"," +
+            "\\\"definition\\\":null},{\\\"number\\\":5,\\\"name\\\":\\\"nestedField\\\",\\\"type\\\":\\\"MESSAGE\\\",\\\"" +
+            "label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition\\\":null},{\\\"number\\\":10,\\\"name\\\":\\\"repeatedField\\\"," +
             "\\\"type\\\":\\\"STRING\\\",\\\"label\\\":\\\"LABEL_REPEATED\\\",\\\"definition\\\":null}]\"}";
 
     @Test
@@ -89,7 +88,7 @@ public class ProtobufSchemaTest {
         try {
             ProtobufSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage> protobufSchema
                     = ProtobufSchema.ofGenericClass(org.apache.pulsar.client.schema.proto.Test.TestMessage.class,
-                    Collections.emptyMap());
+                    new HashMap<>());
         } catch (Exception e) {
             Assert.fail("Should not construct a ProtobufShema over a non-protobuf-generated class");
         }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
index 456416f..207fd98 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
@@ -179,7 +179,7 @@ public class SchemaBuilderTest {
         SchemaInfo schemaInfo = recordSchemaBuilder.build(
             SchemaType.AVRO
         );
-        GenericSchema schema = Schema.generic(schemaInfo);
+        GenericSchema<GenericRecord> schema = Schema.generic(schemaInfo);
         GenericRecord record = schema.newRecordBuilder()
             .set(schema.getFields().get(0), 32)
             .set(schema.getFields().get(1), 1234L)
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaTestUtils.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaTestUtils.java
index 98d5367..a7abc74 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaTestUtils.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaTestUtils.java
@@ -47,6 +47,14 @@ public class SchemaTestUtils {
         @AvroDefault("\"defaultValue\"")
         private String fieldUnableNull;
     }
+    @Data
+    @ToString
+    @EqualsAndHashCode
+    public static class FooV2 {
+        @Nullable
+        private String field1;
+        private int field3;
+    }
 
     @Data
     @ToString
@@ -126,4 +134,8 @@ public class SchemaTestUtils {
             "fieldUnableNull"
     };
 
+    public static String TEST_MULTI_VERSION_SCHEMA_STRING = "TEST";
+
+    public static String TEST_MULTI_VERSION_SCHEMA_DEFAULT_STRING = "defaultValue";
+
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningAvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningAvroSchemaTest.java
new file mode 100644
index 0000000..47268a1
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningAvroSchemaTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.testng.Assert.assertEquals;
+
+public class SupportVersioningAvroSchemaTest {
+    private MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider;
+    private AvroSchema schema;
+    private GenericAvroSchema genericAvroSchema;
+    private AvroSchema<SchemaTestUtils.FooV2> avroFooV2Schema;
+
+
+    @BeforeMethod
+    public void setup() {
+        this.multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class);
+        avroFooV2Schema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.FooV2>builder()
+                .withAlwaysAllowNull(false).withPojo(SchemaTestUtils.FooV2.class).build());
+        this.schema = AvroSchema.of(SchemaDefinition.builder()
+        .withPojo(SchemaTestUtils.Foo.class)
+                .withAlwaysAllowNull(false)
+                .withSupportSchemaVersioning(true)
+                .build());
+        schema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
+        SchemaInfo schemaInfo = avroFooV2Schema.schemaInfo;
+        genericAvroSchema = new GenericAvroSchema(schemaInfo);
+    }
+
+    @Test
+    public void testDecode() {
+        when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
+                .thenReturn(genericAvroSchema.getSchemaInfo());
+        SchemaTestUtils.FooV2 fooV2 = new SchemaTestUtils.FooV2();
+        fooV2.setField1(SchemaTestUtils.TEST_MULTI_VERSION_SCHEMA_STRING);
+        SchemaTestUtils.Foo foo = (SchemaTestUtils.Foo)schema.decode(avroFooV2Schema.encode(fooV2), new byte[10]);
+        assertEquals(SchemaTestUtils.TEST_MULTI_VERSION_SCHEMA_STRING, foo.getField1());
+        assertEquals(SchemaTestUtils.TEST_MULTI_VERSION_SCHEMA_DEFAULT_STRING, foo.getFieldUnableNull());
+    }
+
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReaderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReaderTest.java
new file mode 100644
index 0000000..d77d0f2
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReaderTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.generic;
+
+import static org.testng.Assert.assertEquals;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.SchemaTestUtils;
+import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
+import org.apache.pulsar.client.impl.schema.SchemaTestUtils.FooV2;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class GenericAvroReaderTest {
+
+    private Foo foo;
+    private FooV2 fooV2;
+    private AvroSchema fooSchemaNotNull;
+    private AvroSchema fooSchema;
+    private AvroSchema fooV2Schema;
+
+
+    @BeforeMethod
+    public void setup() {
+        fooSchema = AvroSchema.of(Foo.class);
+        fooV2Schema = AvroSchema.of(FooV2.class);
+        fooSchemaNotNull = AvroSchema.of(SchemaDefinition
+                .builder()
+                .withAlwaysAllowNull(false)
+                .withPojo(Foo.class)
+                .build());
+
+        foo = new Foo();
+        foo.setField1("foo1");
+        foo.setField2("bar1");
+        foo.setField4(new SchemaTestUtils.Bar());
+        foo.setFieldUnableNull("notNull");
+
+        fooV2 = new FooV2();
+        fooV2.setField1("foo1");
+        fooV2.setField3(10);
+    }
+
+    @Test
+    public void testGenericAvroReaderByWriterSchema() {
+        byte[] fooBytes = fooSchema.encode(foo);
+
+        GenericAvroReader genericAvroSchemaByWriterSchema = new GenericAvroReader(fooSchema.getAvroSchema());
+        GenericRecord genericRecordByWriterSchema =  genericAvroSchemaByWriterSchema.read(fooBytes);
+        assertEquals(genericRecordByWriterSchema.getField("field1"), "foo1");
+        assertEquals(genericRecordByWriterSchema.getField("field2"), "bar1");
+        assertEquals(genericRecordByWriterSchema.getField("fieldUnableNull"), "notNull");
+    }
+
+    @Test
+    public void testGenericAvroReaderByReaderSchema() {
+        byte[] fooV2Bytes = fooV2Schema.encode(fooV2);
+
+        GenericAvroReader genericAvroSchemaByReaderSchema = new GenericAvroReader(fooV2Schema.getAvroSchema(), fooSchemaNotNull.getAvroSchema(), new byte[10]);
+        GenericRecord genericRecordByReaderSchema = genericAvroSchemaByReaderSchema.read(fooV2Bytes);
+        assertEquals(genericRecordByReaderSchema.getField("fieldUnableNull"), "defaultValue");
+        assertEquals(genericRecordByReaderSchema.getField("field1"), "foo1");
+        assertEquals(genericRecordByReaderSchema.getField("field3"), 10);
+    }
+
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
index ed554cc..79c2486 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
@@ -19,12 +19,16 @@
 package org.apache.pulsar.client.impl.schema.generic;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
@@ -39,34 +43,46 @@ public class GenericSchemaImplTest {
     @Test
     public void testGenericAvroSchema() {
         Schema<Foo> encodeSchema = Schema.AVRO(Foo.class);
-        GenericSchemaImpl decodeSchema = GenericSchemaImpl.of(encodeSchema.getSchemaInfo());
+        GenericSchema decodeSchema = GenericSchemaImpl.of(encodeSchema.getSchemaInfo());
         testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
     }
 
     @Test
     public void testGenericJsonSchema() {
         Schema<Foo> encodeSchema = Schema.JSON(Foo.class);
-        GenericSchemaImpl decodeSchema = GenericSchemaImpl.of(encodeSchema.getSchemaInfo());
+        GenericSchema decodeSchema = GenericSchemaImpl.of(encodeSchema.getSchemaInfo());
         testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
     }
 
     @Test
     public void testAutoAvroSchema() {
-        Schema<Foo> encodeSchema = Schema.AVRO(Foo.class);
+        MultiVersionSchemaInfoProvider multiVersionGenericSchemaProvider = mock(MultiVersionSchemaInfoProvider.class);
         AutoConsumeSchema decodeSchema = new AutoConsumeSchema();
-        decodeSchema.setSchema(GenericSchemaImpl.of(encodeSchema.getSchemaInfo()));
-        testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
+        Schema<Foo> encodeSchema = Schema.AVRO(Foo.class);
+        GenericSchema genericSchema = GenericSchemaImpl.of(encodeSchema.getSchemaInfo());
+        genericSchema.setSchemaInfoProvider(multiVersionGenericSchemaProvider);
+        decodeSchema.setSchema(genericSchema);
+        when(multiVersionGenericSchemaProvider.getSchemaByVersion(any(byte[].class)))
+                .thenReturn(genericSchema.getSchemaInfo());
+
+        testAUTOEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
     }
 
     @Test
     public void testAutoJsonSchema() {
+        MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class);
         Schema<Foo> encodeSchema = Schema.JSON(Foo.class);
+        GenericSchema genericSchema = GenericSchemaImpl.of(encodeSchema.getSchemaInfo());
+        genericSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
         AutoConsumeSchema decodeSchema = new AutoConsumeSchema();
-        decodeSchema.setSchema(GenericSchemaImpl.of(encodeSchema.getSchemaInfo()));
-        testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
+        decodeSchema.setSchema(genericSchema);
+        GenericSchema genericAvroSchema = GenericSchemaImpl.of(Schema.AVRO(Foo.class).getSchemaInfo());
+        when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
+                .thenReturn(genericAvroSchema.getSchemaInfo());
+        testAUTOEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
     }
 
-    public void testEncodeAndDecodeGenericRecord(Schema<Foo> encodeSchema,
+    public void testAUTOEncodeAndDecodeGenericRecord(Schema<Foo> encodeSchema,
                                                  Schema<GenericRecord> decodeSchema) {
         int numRecords = 10;
         for (int i = 0; i < numRecords; i++) {
@@ -82,6 +98,38 @@ public class GenericSchemaImplTest {
 
             log.info("Decoding : {}", new String(data, UTF_8));
 
+            GenericRecord record = decodeSchema.decode(data, new byte[10]);
+            Object field1 = record.getField("field1");
+            assertEquals("field-1-" + i, field1, "Field 1 is " + field1.getClass());
+            Object field2 = record.getField("field2");
+            assertEquals("field-2-" + i, field2, "Field 2 is " + field2.getClass());
+            Object field3 = record.getField("field3");
+            assertEquals(i, field3, "Field 3 is " + field3.getClass());
+            Object field4 = record.getField("field4");
+            assertTrue(field4 instanceof GenericRecord);
+            GenericRecord field4Record = (GenericRecord) field4;
+            assertEquals(i % 2 == 0, field4Record.getField("field1"));
+            Object fieldUnableNull = record.getField("fieldUnableNull");
+            assertEquals("fieldUnableNull-1-" + i, fieldUnableNull, "fieldUnableNull 1 is " + fieldUnableNull.getClass());
+        }
+    }
+
+    public void testEncodeAndDecodeGenericRecord(Schema<Foo> encodeSchema,
+                                                     Schema<GenericRecord> decodeSchema) {
+        int numRecords = 10;
+        for (int i = 0; i < numRecords; i++) {
+            Foo foo = new Foo();
+            foo.setField1("field-1-" + i);
+            foo.setField2("field-2-" + i);
+            foo.setField3(i);
+            Bar bar = new Bar();
+            bar.setField1(i % 2 == 0);
+            foo.setField4(bar);
+            foo.setFieldUnableNull("fieldUnableNull-1-" + i);
+            byte[] data = encodeSchema.encode(foo);
+
+            log.info("Decoding : {}", new String(data, UTF_8));
+
             GenericRecord record = decodeSchema.decode(data);
             Object field1 = record.getField("field1");
             assertEquals("field-1-" + i, field1, "Field 1 is " + field1.getClass());
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaTest.java
deleted file mode 100644
index d2bce9d..0000000
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl.schema.generic;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertSame;
-import static org.testng.Assert.assertTrue;
-
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-/**
- * Unit test {@link MultiVersionGenericSchema}.
- */
-public class MultiVersionGenericSchemaTest {
-
-    private SchemaProvider<GenericRecord> schemaProvider;
-    private MultiVersionGenericSchema schema;
-
-    @BeforeMethod
-    public void setup() {
-        this.schemaProvider = mock(SchemaProvider.class);
-        this.schema = new MultiVersionGenericSchema(schemaProvider);
-    }
-
-    @Test(expectedExceptions = UnsupportedOperationException.class)
-    public void testEncode() {
-        this.schema.encode(mock(GenericRecord.class));
-    }
-
-    @Test
-    public void testSupportSchemaVersioning() {
-        assertTrue(schema.supportSchemaVersioning());
-    }
-
-    @Test
-    public void testGetSchemaInfo() {
-        assertEquals(new byte[0], schema.getSchemaInfo().getSchema());
-    }
-
-    @Test
-    public void testDecode() {
-        Schema<GenericRecord> mockSchema = mock(Schema.class);
-        when(schemaProvider.getSchema(any(byte[].class)))
-            .thenReturn(mockSchema);
-        when(schemaProvider.getSchema(eq(null)))
-            .thenReturn(mockSchema);
-
-        GenericRecord mockRecord = mock(GenericRecord.class);
-        when(mockSchema.decode(any(byte[].class), any(byte[].class)))
-            .thenReturn(mockRecord);
-        when(mockSchema.decode(any(byte[].class)))
-            .thenReturn(mockRecord);
-
-        assertSame(
-            mockRecord, schema.decode(new byte[0]));
-        verify(mockSchema, times(1))
-            .decode(any(byte[].class));
-
-        assertSame(
-            mockRecord, schema.decode(new byte[0], new byte[0]));
-        verify(mockSchema, times(1))
-            .decode(any(byte[].class), any(byte[].class));
-    }
-
-}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProviderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
similarity index 85%
rename from pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProviderTest.java
rename to pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
index aedc0bd..ed14580 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProviderTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
@@ -29,7 +29,6 @@ import org.apache.pulsar.client.impl.LookupService;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils;
-import org.apache.pulsar.client.tutorial.JsonPojo;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.testng.annotations.BeforeMethod;
@@ -39,17 +38,17 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 /**
- * Unit test for {@link MultiVersionGenericSchemaProvider}.
+ * Unit test for {@link MultiVersionSchemaInfoProvider}.
  */
-public class MultiVersionGenericSchemaProviderTest {
+public class MultiVersionSchemaInfoProviderTest {
 
-    private MultiVersionGenericSchemaProvider schemaProvider;
+    private MultiVersionSchemaInfoProvider schemaProvider;
 
     @BeforeMethod
     public void setup() {
         PulsarClientImpl client = mock(PulsarClientImpl.class);
         when(client.getLookup()).thenReturn(mock(LookupService.class));
-        schemaProvider = new MultiVersionGenericSchemaProvider(
+        schemaProvider = new MultiVersionSchemaInfoProvider(
                 TopicName.get("persistent://public/default/my-topic"), client);
     }
 
@@ -63,7 +62,7 @@ public class MultiVersionGenericSchemaProviderTest {
                         any(TopicName.class),
                         any(byte[].class)))
                 .thenReturn(completableFuture);
-        GenericSchema schema = schemaProvider.getSchema(new byte[0]);
-        assertEquals(schema.getSchemaInfo(), schemaInfo);
+        SchemaInfo schemaInfoByVersion = schemaProvider.getSchemaByVersion(new byte[0]);
+        assertEquals(schemaInfoByVersion, schemaInfo);
     }
 }
diff --git a/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java b/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java
index 984f658..58388ef 100644
--- a/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java
+++ b/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.io.hbase.sink;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
@@ -31,10 +29,12 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.functions.api.Record;
@@ -52,12 +52,15 @@ import java.util.List;
 import java.util.Map;
 
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * hbase Sink test
  */
 @Slf4j
 public class HbaseGenericRecordSinkTest {
+    private Message<GenericRecord> message;
+
 
     /**
      * A Simple class to test hbase class
@@ -84,6 +87,9 @@ public class HbaseGenericRecordSinkTest {
 
     @Test(enabled = false)
     public void TestOpenAndWriteSink() throws Exception {
+        message = mock(MessageImpl.class);
+        GenericSchema<GenericRecord> genericAvroSchema;
+
         Map<String, Object> map = new HashMap<>();
         map.put("zookeeperQuorum", "localhost");
         map.put("zookeeperClientPort", "2181");
@@ -112,13 +118,12 @@ public class HbaseGenericRecordSinkTest {
         AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
 
         byte[] bytes = schema.encode(obj);
-        ByteBuf payload = Unpooled.copiedBuffer(bytes);
         AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
         autoConsumeSchema.setSchema(GenericSchemaImpl.of(schema.getSchemaInfo()));
 
         PulsarSourceConfig pulsarSourceConfig = new PulsarSourceConfig();
         Consumer consumer = mock(Consumer.class);
-        Message<GenericRecord> message = new MessageImpl("fake_topic_name", "11:111", map, payload, autoConsumeSchema);
+
         Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
             .message(message)
             .topicName("fake_topic_name")
@@ -136,6 +141,11 @@ public class HbaseGenericRecordSinkTest {
             })
             .build();
 
+        genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
+
+        when(message.getValue())
+                .thenReturn(genericAvroSchema.decode(bytes));
+
         log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
                 obj.toString(),
                 message.getValue().toString(),
diff --git a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java
index c95a9ed..fe8941e 100644
--- a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java
+++ b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java
@@ -27,9 +27,11 @@ import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.source.PulsarRecord;
@@ -57,6 +59,8 @@ import static org.mockito.Mockito.when;
 @Slf4j
 public class InfluxDBGenericRecordSinkTest {
 
+    private Message<GenericRecord> message;
+
     /**
      * A Simple class to test InfluxDB class
      */
@@ -107,6 +111,8 @@ public class InfluxDBGenericRecordSinkTest {
 
     @Test
     public void testOpenAndWrite() throws Exception {
+        message = mock(MessageImpl.class);
+        GenericSchema<GenericRecord> genericAvroSchema;
         // prepare a cpu Record
         Cpu cpu = new Cpu();
         cpu.setMeasurement("cpu");
@@ -121,17 +127,19 @@ public class InfluxDBGenericRecordSinkTest {
         AvroSchema<Cpu> schema = AvroSchema.of(Cpu.class);
 
         byte[] bytes = schema.encode(cpu);
-        ByteBuf payload = Unpooled.copiedBuffer(bytes);
         AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
         autoConsumeSchema.setSchema(GenericSchemaImpl.of(schema.getSchemaInfo()));
 
-        Message<GenericRecord> message = new MessageImpl("influx_cpu", "77:777",
-            configMap, payload, autoConsumeSchema);
         Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
             .message(message)
             .topicName("influx_cpu")
             .build();
 
+        genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
+
+        when(message.getValue())
+                .thenReturn(genericAvroSchema.decode(bytes));
+
         log.info("cpu:{}, Message.getValue: {}, record.getValue: {}",
             cpu.toString(),
             message.getValue().toString(),
diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
index 9ecc91a..aec9d7e 100644
--- a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
@@ -20,8 +20,7 @@
 package org.apache.pulsar.io.jdbc;
 
 import com.google.common.collect.Maps;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
+
 import java.util.Map;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
@@ -29,11 +28,11 @@ import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
-import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.source.PulsarRecord;
 import org.testng.Assert;
@@ -41,12 +40,16 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 /**
  * Jdbc Sink test
  */
 @Slf4j
 public class JdbcSinkTest {
     private final SqliteUtils sqliteUtils = new SqliteUtils(getClass().getSimpleName());
+    private Message<GenericRecord> message;
 
     /**
      * A Simple class to test jdbc class
@@ -72,9 +75,11 @@ public class JdbcSinkTest {
 
     @Test
     public void TestOpenAndWriteSink() throws Exception {
+        message = mock(MessageImpl.class);
         JdbcAutoSchemaSink jdbcSink;
         Map<String, Object> conf;
         String tableName = "TestOpenAndWriteSink";
+        GenericSchema<GenericRecord> genericAvroSchema;
 
         String jdbcUrl = sqliteUtils.sqliteUri();
         conf = Maps.newHashMap();
@@ -99,16 +104,16 @@ public class JdbcSinkTest {
         AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
 
         byte[] bytes = schema.encode(obj);
-        ByteBuf payload = Unpooled.copiedBuffer(bytes);
-        AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
-        autoConsumeSchema.setSchema(GenericSchemaImpl.of(schema.getSchemaInfo()));
 
-        Message<GenericRecord> message = new MessageImpl("fake_topic_name", "77:777", conf, payload, autoConsumeSchema);
         Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
             .message(message)
             .topicName("fake_topic_name")
             .build();
 
+        genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
+
+        when(message.getValue())
+                .thenReturn(genericAvroSchema.decode(bytes));
         log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
             obj.toString(),
             message.getValue().toString(),
diff --git a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java
index 72462e7..92d1f4f 100644
--- a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java
+++ b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java
@@ -18,17 +18,17 @@
  */
 package org.apache.pulsar.io.solr;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.source.PulsarRecord;
@@ -39,6 +39,9 @@ import org.testng.annotations.Test;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 /**
  * solr Sink test
  */
@@ -46,6 +49,7 @@ import java.util.Map;
 public class SolrGenericRecordSinkTest {
 
     private SolrServerUtil solrServerUtil;
+    private Message<GenericRecord> message;
 
     /**
      * A Simple class to test solr class
@@ -71,6 +75,7 @@ public class SolrGenericRecordSinkTest {
 
     @Test
     public void TestOpenAndWriteSink() throws Exception {
+        message = mock(MessageImpl.class);
         Map<String, Object> configs = new HashMap<>();
         configs.put("solrUrl", "http://localhost:8983/solr");
         configs.put("solrMode", "Standalone");
@@ -78,6 +83,7 @@ public class SolrGenericRecordSinkTest {
         configs.put("solrCommitWithinMs", "100");
         configs.put("username", "");
         configs.put("password", "");
+        GenericSchema<GenericRecord> genericAvroSchema;
 
         SolrGenericRecordSink sink = new SolrGenericRecordSink();
 
@@ -88,16 +94,19 @@ public class SolrGenericRecordSinkTest {
         AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
 
         byte[] bytes = schema.encode(obj);
-        ByteBuf payload = Unpooled.copiedBuffer(bytes);
         AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
         autoConsumeSchema.setSchema(GenericSchemaImpl.of(schema.getSchemaInfo()));
 
-        Message<GenericRecord> message = new MessageImpl("fake_topic_name", "77:777", configs, payload, autoConsumeSchema);
         Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
             .message(message)
             .topicName("fake_topic_name")
             .build();
 
+        genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
+
+        when(message.getValue())
+                .thenReturn(genericAvroSchema.decode(bytes));
+
         log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
             obj.toString(),
             message.getValue().toString(),
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
index fab2542..12b8122 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
@@ -19,16 +19,17 @@
 package org.apache.pulsar.tests.integration.schema;
 
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
+import static org.testng.Assert.assertEquals;
 
 import com.google.common.collect.Sets;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.tests.integration.schema.Schemas.Person;
+import org.apache.pulsar.tests.integration.schema.Schemas.PersonConsumeSchema;
 import org.apache.pulsar.tests.integration.schema.Schemas.Student;
 import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
 import org.testng.annotations.BeforeMethod;
@@ -106,5 +107,54 @@ public class SchemaTest extends PulsarTestSuite {
         }
     }
 
+    @Test
+    public void testMultiVersionSchema() throws Exception {
+        final String tenant = PUBLIC_TENANT;
+        final String namespace = "test-namespace-" + randomName(16);
+        final String topic = "test-multi-version-schema";
+        final String fqtn = TopicName.get(
+                TopicDomain.persistent.value(),
+                tenant,
+                namespace,
+                topic
+        ).toString();
+
+        admin.namespaces().createNamespace(
+                tenant + "/" + namespace,
+                Sets.newHashSet(pulsarCluster.getClusterName())
+        );
+
+        // Create a topic with `Person`
+        try (Producer<Person> producer = client.newProducer(Schema.AVRO(
+                SchemaDefinition.<Person>builder().withAlwaysAllowNull
+                        (false).withSupportSchemaVersioning(true).
+                        withPojo(Person.class).build()))
+                .topic(fqtn)
+                .create()
+        ) {
+            Person person = new Person();
+            person.setName("Tom Hanks");
+            person.setAge(60);
+
+            producer.send(person);
+
+            log.info("Successfully published person : {}", person);
+        }
+
+        //Create a consumer for MultiVersionSchema
+        try (Consumer<PersonConsumeSchema> consumer = client.newConsumer(Schema.AVRO(
+                SchemaDefinition.<PersonConsumeSchema>builder().withAlwaysAllowNull
+                        (false).withSupportSchemaVersioning(true).
+                        withPojo(PersonConsumeSchema.class).build()))
+                .topic(fqtn)
+                .subscribe();
+        ) {
+            PersonConsumeSchema personConsumeSchema = consumer.receive().getValue();
+            assertEquals("Tom Hanks", personConsumeSchema.getName());
+            assertEquals(60, personConsumeSchema.getAge());
+            assertEquals("man", personConsumeSchema.getGender());
+            log.info("Successfully consumer personConsumeSchema : {}", personConsumeSchema);
+        }
+    }
 
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/Schemas.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/Schemas.java
index ebe798d..2c4af95 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/Schemas.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/Schemas.java
@@ -36,6 +36,7 @@ import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
+import org.apache.avro.reflect.AvroDefault;
 
 /**
  * Keep a list of schemas for testing.
@@ -58,6 +59,23 @@ public final class Schemas {
     }
 
     /**
+     * A Person Struct.
+     */
+    @Data
+    @Getter
+    @Setter
+    @ToString
+    @EqualsAndHashCode
+    public static class PersonConsumeSchema {
+
+        private String name;
+        private int age;
+        @AvroDefault("\"male\"")
+        private String gender;
+
+    }
+
+    /**
      * A Student Struct.
      */
     @Data