You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/05/19 06:23:09 UTC

[pulsar] branch master updated: Support multiple Avro schema version in Pulsar SQL (#4847)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 097108a  Support multiple Avro schema version in Pulsar SQL (#4847)
097108a is described below

commit 097108a7abae750a239abb606d2a31442319e741
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Tue May 19 14:22:04 2020 +0800

    Support multiple Avro schema version in Pulsar SQL (#4847)
    
    Support multiple Avro schema version in Pulsar SQL
---
 .../pulsar/broker/admin/v2/SchemasResource.java    |   2 +-
 .../service/schema/BookkeeperSchemaStorage.java    |   1 +
 .../service/schema/SchemaRegistryServiceImpl.java  |   1 +
 .../schema/BookkeeperSchemaStorageTest.java        |   1 +
 .../broker/service/schema/SchemaServiceTest.java   |   1 +
 .../apache/pulsar/client/api/SimpleSchemaTest.java |   2 +-
 .../pulsar/client/impl/schema/AbstractSchema.java  |   6 +-
 .../pulsar/client/impl/schema/StructSchema.java    |   8 +-
 .../apache/pulsar/common/api/raw/RawMessage.java   |   8 +-
 .../pulsar/common/api/raw/RawMessageImpl.java      |   8 ++
 .../pulsar/common}/schema/LongSchemaVersion.java   |   5 +-
 .../apache/pulsar/common/schema/package-info.java  |  16 +--
 .../pulsar/sql/presto/AvroSchemaHandler.java       | 101 ++++++++--------
 .../pulsar/sql/presto/JSONSchemaHandler.java       |   5 -
 .../pulsar/sql/presto/KeyValueSchemaHandler.java   |  32 ++++--
 .../sql/presto/PulsarPrimitiveSchemaHandler.java   |  21 ++--
 .../pulsar/sql/presto/PulsarRecordCursor.java      |  24 ++--
 .../pulsar/sql/presto/PulsarSchemaHandlers.java    |  36 +++---
 .../sql/presto/PulsarSqlSchemaInfoProvider.java    | 104 +++++++++++++++++
 .../apache/pulsar/sql/presto/SchemaHandler.java    |  15 ++-
 .../pulsar/sql/presto/TestAvroSchemaHandler.java   | 127 +++++++++++++++++++++
 .../presto/TestPulsarKeyValueSchemaHandler.java    |  46 ++++++--
 .../presto/TestPulsarPrimitiveSchemaHandler.java   |   2 +
 .../pulsar/sql/presto/TestPulsarRecordCursor.java  |  13 +++
 24 files changed, 446 insertions(+), 139 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
index 868c7ec..54221dd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
@@ -54,7 +54,7 @@ import javax.ws.rs.core.Response;
 
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
-import org.apache.pulsar.broker.service.schema.LongSchemaVersion;
+import org.apache.pulsar.common.schema.LongSchemaVersion;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index b013c8b..05a9911 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -52,6 +52,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.common.protocol.schema.SchemaStorage;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.apache.pulsar.common.schema.LongSchemaVersion;
 import org.apache.pulsar.common.protocol.schema.StoredSchema;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.zookeeper.CreateMode;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 827f0d8..28ac6da 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.protocol.schema.SchemaHash;
+import org.apache.pulsar.common.schema.LongSchemaVersion;
 import org.apache.pulsar.common.protocol.schema.SchemaStorage;
 import org.apache.pulsar.common.protocol.schema.StoredSchema;
 import org.apache.pulsar.common.schema.SchemaType;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageTest.java
index e22462e..9d2750d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.schema;
 import java.nio.ByteBuffer;
 import org.apache.bookkeeper.client.api.BKException;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.common.schema.LongSchemaVersion;
 import org.apache.pulsar.zookeeper.LocalZooKeeperCache;
 import org.testng.annotations.Test;
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index 00b09cb..d1452a2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.schema.LongSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.testng.annotations.AfterMethod;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index 5548089..b3cd763 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -30,7 +30,7 @@ import static org.testng.Assert.assertTrue;
 
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.Schema.Parser;
-import org.apache.pulsar.broker.service.schema.LongSchemaVersion;
+import org.apache.pulsar.common.schema.LongSchemaVersion;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
 import org.apache.pulsar.client.api.PulsarClientException.InvalidMessageException;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
index 1084328..255a491 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
@@ -22,7 +22,7 @@ import io.netty.buffer.ByteBuf;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 
-abstract class AbstractSchema<T> implements Schema<T> {
+public abstract class AbstractSchema<T> implements Schema<T> {
 
     /**
      * Check if the message read able length length is a valid object for this schema.
@@ -47,7 +47,7 @@ abstract class AbstractSchema<T> implements Schema<T> {
      *            the byte buffer to decode
      * @return the deserialized object
      */
-    abstract T decode(ByteBuf byteBuf);
+    public abstract T decode(ByteBuf byteBuf);
     /**
      * Decode a byteBuf into an object using a given version.
      *
@@ -57,7 +57,7 @@ abstract class AbstractSchema<T> implements Schema<T> {
      *            the schema version to decode the object. null indicates using latest version.
      * @return the deserialized object
      */
-    T decode(ByteBuf byteBuf, byte[] schemaVersion) {
+    public T decode(ByteBuf byteBuf, byte[] schemaVersion) {
         // ignore version by default (most of the primitive schema implementations ignore schema version)
         return decode(byteBuf);
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
index fc0608b..6fa344a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
@@ -103,7 +103,8 @@ public abstract class StructSchema<T> extends AbstractSchema<T> {
     @Override
     public T decode(byte[] bytes, byte[] schemaVersion) {
         try {
-            return readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(bytes);
+            return schemaVersion == null ? decode(bytes) :
+                    readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(bytes);
         } catch (ExecutionException | AvroTypeException e) {
             if (e instanceof AvroTypeException) {
                 throw new SchemaSerializationException(e);
@@ -122,7 +123,8 @@ public abstract class StructSchema<T> extends AbstractSchema<T> {
     @Override
     public T decode(ByteBuf byteBuf, byte[] schemaVersion) {
         try {
-            return readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(new ByteBufInputStream(byteBuf));
+            return schemaVersion == null ? decode(byteBuf) :
+                    readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(new ByteBufInputStream(byteBuf));
         } catch (ExecutionException e) {
             LOG.error("Can't get generic schema for topic {} schema version {}",
                     schemaInfoProvider.getTopicName(), Hex.encodeHexString(schemaVersion), e);
@@ -180,7 +182,7 @@ public abstract class StructSchema<T> extends AbstractSchema<T> {
         return parser.parse(schemaJson);
     }
 
-    protected static <T> SchemaInfo parseSchemaInfo(SchemaDefinition<T> schemaDefinition, SchemaType schemaType) {
+    public static <T> SchemaInfo parseSchemaInfo(SchemaDefinition<T> schemaDefinition, SchemaType schemaType) {
         return SchemaInfo.builder()
                 .schema(createAvroSchema(schemaDefinition).toString().getBytes(UTF_8))
                 .properties(schemaDefinition.getProperties())
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java
index 07a8098..7e60455 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java
@@ -103,6 +103,13 @@ public interface RawMessage {
     Optional<String> getKey();
 
     /**
+     * Get the schema verison of the message.
+     *
+     * @return the schema version of the message
+     */
+    byte[] getSchemaVersion();
+
+    /**
      * Get byteBuf of the key.
      *
      * @return the byte array with the key payload
@@ -115,5 +122,4 @@ public interface RawMessage {
      * @return true if the key is base64 encoded, false otherwise
      */
     boolean hasBase64EncodedKey();
-
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
index 371f47f..046349e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
@@ -143,6 +143,14 @@ public class RawMessageImpl implements RawMessage {
     }
 
     @Override
+    public byte[] getSchemaVersion() {
+        if (msgMetadata != null && msgMetadata.get().hasSchemaVersion()) {
+            return msgMetadata.get().getSchemaVersion().toByteArray();
+        } else {
+            return null;
+        }
+    }
+
     public Optional<ByteBuf> getKeyBytes() {
         if (getKey().isPresent()) {
             if (hasBase64EncodedKey()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/LongSchemaVersion.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/LongSchemaVersion.java
similarity index 96%
rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/LongSchemaVersion.java
rename to pulsar-common/src/main/java/org/apache/pulsar/common/schema/LongSchemaVersion.java
index 7da2d87..7e95510 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/LongSchemaVersion.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/LongSchemaVersion.java
@@ -16,13 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.service.schema;
+package org.apache.pulsar.common.schema;
 
 import com.google.common.base.MoreObjects;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 
+/**
+ * Long schema version.
+ */
 public class LongSchemaVersion implements SchemaVersion {
     private final long version;
 
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/package-info.java
similarity index 70%
copy from pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/schema/package-info.java
index 7529034..88f4f1c 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/package-info.java
@@ -16,19 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.sql.presto;
-
-import io.netty.buffer.ByteBuf;
-
 /**
- * This interface defines the methods to work with schemas.
+ * Implementation of the common of the pulsar schema.
  */
-public interface SchemaHandler {
-
-    Object deserialize(ByteBuf payload);
-
-    Object deserialize(ByteBuf keyPayload, ByteBuf dataPayload);
-
-    Object extractField(int index, Object currentRecord);
-
-}
+package org.apache.pulsar.common.schema;
\ No newline at end of file
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
index 3e3685e..966dc07 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
@@ -18,92 +18,87 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import io.airlift.log.Logger;
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.util.ReferenceCountUtil;
-import io.netty.util.concurrent.FastThreadLocal;
-import java.io.IOException;
+
 import java.util.List;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DecoderFactory;
+
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
 
 /**
  * Schema handler for payload in the Avro format.
  */
 public class AvroSchemaHandler implements SchemaHandler {
 
-    private final DatumReader<GenericRecord> datumReader;
-
     private final List<PulsarColumnHandle> columnHandles;
 
-    private static final FastThreadLocal<BinaryDecoder> decoders =
-            new FastThreadLocal<>();
+    private final GenericAvroSchema genericAvroSchema;
+
+    private final SchemaInfo schemaInfo;
 
     private static final Logger log = Logger.get(AvroSchemaHandler.class);
 
-    public AvroSchemaHandler(Schema schema, List<PulsarColumnHandle> columnHandles) {
-        this.datumReader = new GenericDatumReader<>(schema);
+    AvroSchemaHandler(TopicName topicName,
+                      PulsarConnectorConfig pulsarConnectorConfig,
+                      SchemaInfo schemaInfo,
+                      List<PulsarColumnHandle> columnHandles) throws PulsarClientException {
+        this(new PulsarSqlSchemaInfoProvider(topicName,
+                                pulsarConnectorConfig.getPulsarAdmin()), schemaInfo, columnHandles);
+    }
+
+    AvroSchemaHandler(PulsarSqlSchemaInfoProvider pulsarSqlSchemaInfoProvider,
+                      SchemaInfo schemaInfo, List<PulsarColumnHandle> columnHandles) {
+        this.schemaInfo = schemaInfo;
+        this.genericAvroSchema = new GenericAvroSchema(schemaInfo);
+        this.genericAvroSchema.setSchemaInfoProvider(pulsarSqlSchemaInfoProvider);
         this.columnHandles = columnHandles;
     }
 
     @Override
     public Object deserialize(ByteBuf payload) {
-
-        ByteBuf heapBuffer = null;
-        try {
-            BinaryDecoder decoderFromCache = decoders.get();
-
-            // Make a copy into a heap buffer, since Avro cannot deserialize directly from direct memory
-            int size = payload.readableBytes();
-            heapBuffer = ByteBufAllocator.DEFAULT.heapBuffer(size, size);
-            heapBuffer.writeBytes(payload);
-
-            BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(heapBuffer.array(), heapBuffer.arrayOffset(),
-                    heapBuffer.readableBytes(), decoderFromCache);
-            if (decoderFromCache == null) {
-                decoders.set(decoder);
-            }
-            return this.datumReader.read(null, decoder);
-        } catch (IOException e) {
-            log.error(e);
-        } finally {
-            ReferenceCountUtil.safeRelease(heapBuffer);
-        }
-        return null;
+        return genericAvroSchema.decode(payload);
     }
 
     @Override
-    public Object deserialize(ByteBuf keyPayload, ByteBuf dataPayload) {
-        return null;
+    public Object deserialize(ByteBuf payload, byte[] schemaVersion) {
+        return genericAvroSchema.decode(payload, schemaVersion);
     }
 
     @Override
     public Object extractField(int index, Object currentRecord) {
         try {
-            GenericRecord record = (GenericRecord) currentRecord;
+            GenericAvroRecord record = (GenericAvroRecord) currentRecord;
             PulsarColumnHandle pulsarColumnHandle = this.columnHandles.get(index);
-            Integer[] positionIndices = pulsarColumnHandle.getPositionIndices();
-            Object curr = record.get(positionIndices[0]);
-            if (curr == null) {
-                return null;
-            }
-            if (positionIndices.length > 0) {
-                for (int i = 1; i < positionIndices.length; i++) {
-                    curr = ((GenericRecord) curr).get(positionIndices[i]);
-                    if (curr == null) {
-                        return null;
-                    }
+            String[] names = pulsarColumnHandle.getFieldNames();
+
+            if (names.length == 1) {
+                return record.getField(pulsarColumnHandle.getFieldNames()[0]);
+            } else {
+                for (int i = 0; i < names.length - 1; i++) {
+                    record = (GenericAvroRecord) record.getField(names[i]);
                 }
+                return record.getField(names[names.length - 1]);
             }
-            return curr;
         } catch (Exception ex) {
             log.debug(ex, "%s", ex);
         }
         return null;
     }
+
+    @VisibleForTesting
+    GenericAvroSchema getSchema() {
+        return this.genericAvroSchema;
+    }
+
+    @VisibleForTesting
+    SchemaInfo getSchemaInfo() {
+        return schemaInfo;
+    }
 }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java
index 99863a3..8649e41 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java
@@ -74,11 +74,6 @@ public class JSONSchemaHandler implements SchemaHandler {
     }
 
     @Override
-    public Object deserialize(ByteBuf keyPayload, ByteBuf dataPayload) {
-        return null;
-    }
-
-    @Override
     public Object extractField(int index, Object currentRecord) {
         try {
             Map jsonObject = (Map) currentRecord;
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/KeyValueSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/KeyValueSchemaHandler.java
index 2f056d3..434dd44 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/KeyValueSchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/KeyValueSchemaHandler.java
@@ -18,11 +18,13 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.airlift.log.Logger;
 import io.netty.buffer.ByteBuf;
 import java.util.List;
 import java.util.Objects;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -43,21 +45,32 @@ public class KeyValueSchemaHandler implements SchemaHandler {
 
     private KeyValueEncodingType keyValueEncodingType;
 
-    public KeyValueSchemaHandler(SchemaInfo schemaInfo, List<PulsarColumnHandle> columnHandles) {
+    public KeyValueSchemaHandler(TopicName topicName,
+                                 PulsarConnectorConfig pulsarConnectorConfig,
+                                 SchemaInfo schemaInfo,
+                                 List<PulsarColumnHandle> columnHandles) {
         this.columnHandles = columnHandles;
         KeyValue<SchemaInfo, SchemaInfo> kvSchemaInfo = KeyValueSchemaInfo.decodeKeyValueSchemaInfo(schemaInfo);
-        keySchemaHandler = PulsarSchemaHandlers.newPulsarSchemaHandler(kvSchemaInfo.getKey(), columnHandles);
-        valueSchemaHandler = PulsarSchemaHandlers.newPulsarSchemaHandler(kvSchemaInfo.getValue(), columnHandles);
+        keySchemaHandler = PulsarSchemaHandlers.newPulsarSchemaHandler(topicName, pulsarConnectorConfig,
+                kvSchemaInfo.getKey(), columnHandles);
+        valueSchemaHandler = PulsarSchemaHandlers.newPulsarSchemaHandler(topicName, pulsarConnectorConfig,
+                kvSchemaInfo.getValue(), columnHandles);
         keyValueEncodingType = KeyValueSchemaInfo.decodeKeyValueEncodingType(schemaInfo);
     }
 
-    @Override
-    public Object deserialize(ByteBuf payload) {
-        return null;
+    @VisibleForTesting
+    KeyValueSchemaHandler(SchemaHandler keySchemaHandler,
+                          SchemaHandler valueSchemaHandler,
+                          List<PulsarColumnHandle> columnHandles) {
+        this.keySchemaHandler = keySchemaHandler;
+        this.valueSchemaHandler = valueSchemaHandler;
+        this.columnHandles = columnHandles;
     }
 
     @Override
-    public Object deserialize(ByteBuf keyPayload, ByteBuf dataPayload) {
+    public Object deserialize(ByteBuf keyPayload, ByteBuf dataPayload, byte[] schemaVersion) {
+        Object keyObj;
+        Object valueObj;
         ByteBuf keyByteBuf;
         ByteBuf valueByteBuf;
         if (Objects.equals(keyValueEncodingType, KeyValueEncodingType.INLINE)) {
@@ -71,8 +84,9 @@ public class KeyValueSchemaHandler implements SchemaHandler {
             keyByteBuf = keyPayload;
             valueByteBuf = dataPayload;
         }
-        Object keyObj = keySchemaHandler.deserialize(keyByteBuf);
-        Object valueObj = valueSchemaHandler.deserialize(valueByteBuf);
+
+        keyObj = keySchemaHandler.deserialize(keyByteBuf, schemaVersion);
+        valueObj = valueSchemaHandler.deserialize(valueByteBuf, schemaVersion);
         return new KeyValue<>(keyObj, valueObj);
     }
 
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPrimitiveSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPrimitiveSchemaHandler.java
index f26587b..354550f 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPrimitiveSchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPrimitiveSchemaHandler.java
@@ -19,11 +19,12 @@
 package org.apache.pulsar.sql.presto;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufUtil;
+
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.Date;
-import org.apache.pulsar.client.api.Schema;
+
+import org.apache.pulsar.client.impl.schema.AbstractSchema;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
@@ -33,17 +34,16 @@ import org.apache.pulsar.common.schema.SchemaInfo;
 public class PulsarPrimitiveSchemaHandler implements SchemaHandler {
 
     private final SchemaInfo schemaInfo;
-    private final Schema<?> schema;
+    private final AbstractSchema<?> schema;
 
-    public PulsarPrimitiveSchemaHandler(SchemaInfo schemaInfo) {
+    PulsarPrimitiveSchemaHandler(SchemaInfo schemaInfo) {
         this.schemaInfo = schemaInfo;
-        this.schema = AutoConsumeSchema.getSchema(schemaInfo);
+        this.schema = (AbstractSchema<?>) AutoConsumeSchema.getSchema(schemaInfo);
     }
 
     @Override
-    public Object deserialize(ByteBuf byteBuf) {
-        byte[] data = ByteBufUtil.getBytes(byteBuf);
-        Object currentRecord = schema.decode(data);
+    public Object deserialize(ByteBuf payload) {
+        Object currentRecord = schema.decode(payload);
         switch (schemaInfo.getType()) {
             case DATE:
                 return ((Date) currentRecord).getTime();
@@ -57,11 +57,6 @@ public class PulsarPrimitiveSchemaHandler implements SchemaHandler {
     }
 
     @Override
-    public Object deserialize(ByteBuf keyPayload, ByteBuf dataPayload) {
-        return null;
-    }
-
-    @Override
     public Object extractField(int index, Object currentRecord) {
         return currentRecord;
     }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index d7e2e2f..a337189 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -155,10 +155,9 @@ public class PulsarRecordCursor implements RecordCursor {
         this.readOffloaded = pulsarConnectorConfig.getManagedLedgerOffloadDriver() != null;
         this.pulsarConnectorConfig = pulsarConnectorConfig;
 
-        this.schemaHandler = PulsarSchemaHandlers.newPulsarSchemaHandler(
-                pulsarSplit.getSchemaInfo(),
-                columnHandles
-        );
+        this.schemaHandler = PulsarSchemaHandlers
+                .newPulsarSchemaHandler(this.topicName,
+                        this.pulsarConnectorConfig, pulsarSplit.getSchemaInfo(), columnHandles);
 
         log.info("Initializing split with parameters: %s", pulsarSplit);
 
@@ -426,9 +425,11 @@ public class PulsarRecordCursor implements RecordCursor {
             if (this.currentMessage.getKeyBytes().isPresent()) {
                 keyByteBuf = this.currentMessage.getKeyBytes().get();
             }
-            currentRecord = this.schemaHandler.deserialize(keyByteBuf, this.currentMessage.getData());
+            currentRecord = this.schemaHandler.deserialize(keyByteBuf,
+                    this.currentMessage.getData(), this.currentMessage.getSchemaVersion());
         } else {
-            currentRecord = this.schemaHandler.deserialize(this.currentMessage.getData());
+            currentRecord = this.schemaHandler.deserialize(this.currentMessage.getData(),
+                    this.currentMessage.getSchemaVersion());
         }
         metricsTracker.incr_NUM_RECORD_DESERIALIZED();
 
@@ -489,7 +490,11 @@ public class PulsarRecordCursor implements RecordCursor {
         } else if (type.equals(TIME)) {
             return ((Number) record).longValue();
         } else if (type.equals(TIMESTAMP)) {
-            return ((Number) record).longValue();
+            if (record instanceof String) {
+                return Long.parseLong((String) record);
+            } else {
+                return ((Number) record).longValue();
+            }
         } else if (type.equals(TIMESTAMP_WITH_TIME_ZONE)) {
             return packDateTimeWithZone(((Number) record).longValue(), 0);
         } else if (type.equals(TINYINT)) {
@@ -599,4 +604,9 @@ public class PulsarRecordCursor implements RecordCursor {
         Class<?> actual = getType(field).getJavaType();
         checkArgument(actual == expected, "Expected field %s to be type %s but is %s", field, expected, actual);
     }
+
+    @VisibleForTesting
+    SchemaHandler getSchemaHandler() {
+        return this.schemaHandler;
+    }
 }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java
index 3b091ad..359f1af 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java
@@ -19,39 +19,47 @@
 package org.apache.pulsar.sql.presto;
 
 import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
-import static java.nio.charset.StandardCharsets.UTF_8;
 
 import com.facebook.presto.spi.PrestoException;
 import java.util.List;
+
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
 class PulsarSchemaHandlers {
 
-    static SchemaHandler newPulsarSchemaHandler(SchemaInfo schemaInfo,
-                                                List<PulsarColumnHandle> columnHandles) {
+    static SchemaHandler newPulsarSchemaHandler(TopicName topicName,
+                                                PulsarConnectorConfig pulsarConnectorConfig,
+                                                SchemaInfo schemaInfo,
+                                                List<PulsarColumnHandle> columnHandles) throws RuntimeException{
         if (schemaInfo.getType().isPrimitive()) {
             return new PulsarPrimitiveSchemaHandler(schemaInfo);
         } else if (schemaInfo.getType().isStruct()) {
-            switch (schemaInfo.getType()) {
-                case JSON:
-                    return new JSONSchemaHandler(columnHandles);
-                case AVRO:
-                    return new AvroSchemaHandler(PulsarConnectorUtils
-                            .parseSchema(new String(schemaInfo.getSchema(), UTF_8)
-                    ), columnHandles);
-                default:
-                    throw new PrestoException(NOT_SUPPORTED, "Not supported schema type: " + schemaInfo.getType());
+            try {
+                switch (schemaInfo.getType()) {
+                    case JSON:
+                        return new JSONSchemaHandler(columnHandles);
+                    case AVRO:
+                        return new AvroSchemaHandler(topicName, pulsarConnectorConfig, schemaInfo, columnHandles);
+                    default:
+                        throw new PrestoException(NOT_SUPPORTED, "Not supported schema type: " + schemaInfo.getType());
+                }
+            } catch (PulsarClientException e) {
+                throw new RuntimeException(
+                        new Throwable("PulsarAdmin gets version schema fail, topicName : "
+                                + topicName.toString(), e));
             }
-
         } else if (schemaInfo.getType().equals(SchemaType.KEY_VALUE)) {
-            return new KeyValueSchemaHandler(schemaInfo, columnHandles);
+            return new KeyValueSchemaHandler(topicName, pulsarConnectorConfig, schemaInfo, columnHandles);
         } else {
             throw new PrestoException(
                     NOT_SUPPORTED,
                     "Schema `" + schemaInfo.getType() + "` is not supported by presto yet : " + schemaInfo);
         }
+
     }
 
     static SchemaInfo defaultSchema() {
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
new file mode 100644
index 0000000..c88d7e7
--- /dev/null
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Multi version schema info provider for Pulsar SQL leverage guava cache.
+ */
+public class PulsarSqlSchemaInfoProvider implements SchemaInfoProvider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PulsarSqlSchemaInfoProvider.class);
+
+    private final TopicName topicName;
+
+    private final PulsarAdmin pulsarAdmin;
+
+    private final LoadingCache<BytesSchemaVersion, SchemaInfo> cache = CacheBuilder.newBuilder().maximumSize(100000)
+            .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<BytesSchemaVersion, SchemaInfo>() {
+                @Override
+                public SchemaInfo load(BytesSchemaVersion schemaVersion) throws Exception {
+                    return loadSchema(schemaVersion);
+                }
+            });
+
+    PulsarSqlSchemaInfoProvider(TopicName topicName, PulsarAdmin pulsarAdmin) {
+        this.topicName = topicName;
+        this.pulsarAdmin = pulsarAdmin;
+    }
+
+    @Override
+    public CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion) {
+        try {
+            if (null == schemaVersion) {
+                return completedFuture(null);
+            }
+            return completedFuture(cache.get(BytesSchemaVersion.of(schemaVersion)));
+        } catch (ExecutionException e) {
+            LOG.error("Can't get generic schema for topic {} schema version {}",
+                    topicName.toString(), new String(schemaVersion, StandardCharsets.UTF_8), e);
+            return FutureUtil.failedFuture(e.getCause());
+        }
+    }
+
+    @Override
+    public CompletableFuture<SchemaInfo> getLatestSchema() {
+        try {
+            return completedFuture(pulsarAdmin.schemas()
+                    .getSchemaInfo(topicName.toString()));
+        } catch (PulsarAdminException e) {
+            LOG.error("Can't get current schema for topic {}",
+                    topicName.toString(), e);
+            return FutureUtil.failedFuture(e.getCause());
+        }
+    }
+
+    @Override
+    public String getTopicName() {
+        return topicName.getLocalName();
+    }
+
+    private SchemaInfo loadSchema(BytesSchemaVersion bytesSchemaVersion) throws PulsarAdminException {
+        return pulsarAdmin.schemas()
+                .getSchemaInfo(topicName.toString(), ByteBuffer.wrap(bytesSchemaVersion.get()).getLong());
+    }
+
+}
\ No newline at end of file
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java
index 7529034..050eec9 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java
@@ -25,9 +25,20 @@ import io.netty.buffer.ByteBuf;
  */
 public interface SchemaHandler {
 
-    Object deserialize(ByteBuf payload);
+    default Object deserialize(ByteBuf payload) {
+        return null;
+    }
 
-    Object deserialize(ByteBuf keyPayload, ByteBuf dataPayload);
+    // if schemaHandler don't support multi version, we will use method deserialize(ByteBuf payload)
+    default Object deserialize(ByteBuf byteBuf, byte[] schemaVersion) {
+        return deserialize(byteBuf);
+    }
+
+    // if SchemaHandler don't support key value multi version
+    // we will use method deserialize(ByteBuf byteBuf, byte[] schemaVersion)
+    default Object deserialize(ByteBuf keyPayload, ByteBuf dataPayload, byte[] schemaVersion) {
+        return deserialize(dataPayload, schemaVersion);
+    }
 
     Object extractField(int index, Object currentRecord);
 
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestAvroSchemaHandler.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestAvroSchemaHandler.java
new file mode 100644
index 0000000..9871264
--- /dev/null
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestAvroSchemaHandler.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import com.facebook.presto.spi.type.BigintType;
+import io.netty.buffer.ByteBufAllocator;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.StructSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.common.api.raw.RawMessage;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+import static org.mockito.Mockito.mock;
+
+@Slf4j
+public class TestAvroSchemaHandler {
+
+    @Data
+    public static class Foo1 {
+        String field1;
+
+        Bar bar;
+    }
+    @Data
+    public static class Foo2 {
+        String field1;
+
+        String field2;
+
+        Bar bar;
+    }
+
+    @Data static class Bar {
+        String field1;
+
+        String field2;
+    }
+
+    @Test
+    public void testAvroSchemaHandler() throws IOException {
+        List<PulsarColumnHandle> columnHandles = new ArrayList();
+        RawMessage message = mock(RawMessage.class);
+        Schema schema1 = ReflectData.AllowNull.get().getSchema(Foo1.class);
+        PulsarSqlSchemaInfoProvider pulsarSqlSchemaInfoProvider = mock(PulsarSqlSchemaInfoProvider.class);
+        AvroSchemaHandler avroSchemaHandler = new AvroSchemaHandler(pulsarSqlSchemaInfoProvider,
+                StructSchema.parseSchemaInfo(SchemaDefinition.builder().withPojo(Foo2.class).build(), SchemaType.AVRO), columnHandles);
+        byte[] schemaVersion = new byte[8];
+        for (int i = 0 ; i<8; i++) {
+            schemaVersion[i] = 0;
+        }
+        ReflectDatumWriter<Foo1> writer;
+        BinaryEncoder encoder = null;
+        ByteArrayOutputStream byteArrayOutputStream;
+        byteArrayOutputStream = new ByteArrayOutputStream();
+        encoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, encoder);
+        writer = new ReflectDatumWriter<>(schema1);
+        Foo1 foo1 = new Foo1();
+        foo1.setField1("value1");
+        foo1.setBar(new Bar());
+        foo1.getBar().setField1("value1");
+        writer.write(foo1, encoder);
+        encoder.flush();
+        when(message.getSchemaVersion()).thenReturn(schemaVersion);
+        byte[] bytes =byteArrayOutputStream.toByteArray();
+
+        when(message.getData()).thenReturn(ByteBufAllocator.DEFAULT
+                .buffer(bytes.length, bytes.length).writeBytes(byteArrayOutputStream.toByteArray()));
+        when(pulsarSqlSchemaInfoProvider.getSchemaByVersion(any()))
+                .thenReturn(completedFuture(StructSchema.parseSchemaInfo(SchemaDefinition.builder()
+                        .withPojo(Foo1.class).build(), SchemaType.AVRO)));
+
+        Object object  = ((GenericAvroRecord)avroSchemaHandler.deserialize(message.getData(),
+                message.getSchemaVersion())).getField("field1");
+        Assert.assertEquals(foo1.field1, (String)object);
+        String[] fields = new String[2];
+        fields[0] = "bar";
+        fields[1] = "field1";
+        PulsarColumnHandle pulsarColumnHandle = new PulsarColumnHandle("test",
+                "bar.field1",
+                BigintType.BIGINT,
+                true,
+                true,
+                fields,
+                new Integer[5],
+                null);
+        columnHandles.add(pulsarColumnHandle);
+        when(message.getData()).thenReturn(ByteBufAllocator.DEFAULT
+                .buffer(bytes.length, bytes.length).writeBytes(byteArrayOutputStream.toByteArray()));
+        object = avroSchemaHandler.extractField(0, avroSchemaHandler.deserialize(message.getData(),
+                message.getSchemaVersion()));
+        Assert.assertEquals(foo1.bar.field1, (String)object);
+    }
+} 
\ No newline at end of file
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarKeyValueSchemaHandler.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarKeyValueSchemaHandler.java
index 193f68e..5e08c84 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarKeyValueSchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarKeyValueSchemaHandler.java
@@ -37,10 +37,13 @@ import org.apache.pulsar.common.api.raw.RawMessageImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.mockito.Mockito.mock;
+
 
 /**
  * Unit test for KeyValueSchemaHandler
@@ -105,15 +108,15 @@ public class TestPulsarKeyValueSchemaHandler {
         List<PulsarColumnHandle> columnHandleList = getColumnHandlerList(columnMetadataList);
 
         KeyValueSchemaHandler keyValueSchemaHandler =
-                new KeyValueSchemaHandler(schema1.getSchemaInfo(), columnHandleList);
+                new KeyValueSchemaHandler(null, null,schema1.getSchemaInfo(), columnHandleList);
 
-        RawMessageImpl message = Mockito.mock(RawMessageImpl.class);
+        RawMessageImpl message = mock(RawMessageImpl.class);
         Mockito.when(message.getData()).thenReturn(
                 Unpooled.wrappedBuffer(schema1.encode(new KeyValue<>(keyData, valueData)))
         );
 
         KeyValue<ByteBuf, ByteBuf> byteBufKeyValue = getKeyValueByteBuf(message, schema1);
-        Object object = keyValueSchemaHandler.deserialize(byteBufKeyValue.getKey(), byteBufKeyValue.getValue());
+        Object object = keyValueSchemaHandler.deserialize(byteBufKeyValue.getKey(), byteBufKeyValue.getValue(), null);
         Assert.assertEquals(keyValueSchemaHandler.extractField(0, object), keyData);
         Assert.assertEquals(keyValueSchemaHandler.extractField(1, object), valueData);
     }
@@ -140,17 +143,17 @@ public class TestPulsarKeyValueSchemaHandler {
 
         List<PulsarColumnHandle> columnHandleList = getColumnHandlerList(columnMetadataList);
 
-        RawMessage message = Mockito.mock(RawMessage.class);
+        RawMessage message = mock(RawMessage.class);
         Mockito.when(message.getData()).thenReturn(
                 Unpooled.wrappedBuffer(schema2.encode(new KeyValue<>(keyData, foo)))
         );
 
 
         KeyValueSchemaHandler keyValueSchemaHandler =
-                new KeyValueSchemaHandler(schema2.getSchemaInfo(), columnHandleList);
+                new KeyValueSchemaHandler(null, null, schema2.getSchemaInfo(), columnHandleList);
 
         KeyValue<ByteBuf, ByteBuf> byteBufKeyValue = getKeyValueByteBuf(message, schema2);
-        Object object = keyValueSchemaHandler.deserialize(byteBufKeyValue.getKey(), byteBufKeyValue.getValue());
+        Object object = keyValueSchemaHandler.deserialize(byteBufKeyValue.getKey(), byteBufKeyValue.getValue(), null);
         Assert.assertEquals(keyValueSchemaHandler.extractField(0, object), keyData);
         Assert.assertEquals(keyValueSchemaHandler.extractField(1, object),
                 foo.getValue(columnHandleList.get(1).getName()));
@@ -184,10 +187,19 @@ public class TestPulsarKeyValueSchemaHandler {
 
         List<PulsarColumnHandle> columnHandleList = getColumnHandlerList(columnMetadataList);
 
+        PulsarSqlSchemaInfoProvider pulsarSqlSchemaInfoProvider = mock(PulsarSqlSchemaInfoProvider.class);
+
+        KeyValue<SchemaInfo, SchemaInfo> kvSchemaInfo =
+                KeyValueSchemaInfo.decodeKeyValueSchemaInfo(schema3.getSchemaInfo());
+
+        AvroSchemaHandler avroSchemaHandler =
+                new AvroSchemaHandler(pulsarSqlSchemaInfoProvider, kvSchemaInfo.getKey(), columnHandleList);
+        PulsarPrimitiveSchemaHandler pulsarPrimitiveSchemaHandler =
+                new PulsarPrimitiveSchemaHandler(kvSchemaInfo.getValue());
         KeyValueSchemaHandler keyValueSchemaHandler =
-                new KeyValueSchemaHandler(schema3.getSchemaInfo(), columnHandleList);
+                new KeyValueSchemaHandler(avroSchemaHandler, pulsarPrimitiveSchemaHandler, columnHandleList);
 
-        RawMessage message = Mockito.mock(RawMessage.class);
+        RawMessage message = mock(RawMessage.class);
         Mockito.when(message.getKeyBytes()).thenReturn(
                 Optional.of(Unpooled.wrappedBuffer(
                     ((KeyValueSchema) schema3).getKeySchema().encode(boo)
@@ -198,7 +210,8 @@ public class TestPulsarKeyValueSchemaHandler {
         );
 
         KeyValue<ByteBuf, ByteBuf> byteBufKeyValue = getKeyValueByteBuf(message, schema3);
-        Object object = keyValueSchemaHandler.deserialize(byteBufKeyValue.getKey(), byteBufKeyValue.getValue());
+        Integer a = 1;
+        Object object = keyValueSchemaHandler.deserialize(byteBufKeyValue.getKey(), byteBufKeyValue.getValue(), null);
         Assert.assertEquals(keyValueSchemaHandler.extractField(0, object).toString(),
                 boo.getValue(columnHandleList.get(0).getName().substring(KEY_FIELD_NAME_PREFIX_LENGTH)));
         Assert.assertEquals(keyValueSchemaHandler.extractField(1, object),
@@ -228,10 +241,19 @@ public class TestPulsarKeyValueSchemaHandler {
 
         List<PulsarColumnHandle> columnHandleList = getColumnHandlerList(columnMetadataList);
 
+        PulsarSqlSchemaInfoProvider pulsarSqlSchemaInfoProvider = mock(PulsarSqlSchemaInfoProvider.class);
+
+        KeyValue<SchemaInfo, SchemaInfo> kvSchemaInfo =
+                KeyValueSchemaInfo.decodeKeyValueSchemaInfo(schema4.getSchemaInfo());
+
+        AvroSchemaHandler avroSchemaHandler =
+                new AvroSchemaHandler(pulsarSqlSchemaInfoProvider, kvSchemaInfo.getValue(), columnHandleList);
+        JSONSchemaHandler jsonSchemaHandler = new JSONSchemaHandler(columnHandleList);
         KeyValueSchemaHandler keyValueSchemaHandler =
-                new KeyValueSchemaHandler(schema4.getSchemaInfo(), columnHandleList);
+                new KeyValueSchemaHandler(jsonSchemaHandler, avroSchemaHandler, columnHandleList);
+
 
-        RawMessage message = Mockito.mock(RawMessage.class);
+        RawMessage message = mock(RawMessage.class);
         Mockito.when(message.getKeyBytes()).thenReturn(
                 Optional.of(Unpooled.wrappedBuffer(
                         ((KeyValueSchema) schema4).getKeySchema().encode(boo)
@@ -242,7 +264,7 @@ public class TestPulsarKeyValueSchemaHandler {
         );
 
         KeyValue<ByteBuf, ByteBuf> byteBufKeyValue = getKeyValueByteBuf(message, schema4);
-        Object object = keyValueSchemaHandler.deserialize(byteBufKeyValue.getKey(), byteBufKeyValue.getValue());
+        Object object = keyValueSchemaHandler.deserialize(byteBufKeyValue.getKey(), byteBufKeyValue.getValue(), null);
         Assert.assertEquals(keyValueSchemaHandler.extractField(0, object).toString(),
                 boo.getValue(columnHandleList.get(0).getName().substring(KEY_FIELD_NAME_PREFIX_LENGTH)));
         Assert.assertEquals(keyValueSchemaHandler.extractField(1, object),
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarPrimitiveSchemaHandler.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarPrimitiveSchemaHandler.java
index ac2e1f4..307c299 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarPrimitiveSchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarPrimitiveSchemaHandler.java
@@ -138,6 +138,8 @@ public class TestPulsarPrimitiveSchemaHandler {
     public void testNewPulsarPrimitiveSchemaHandler() {
         RawMessage rawMessage = mock(RawMessage.class);
         SchemaHandler schemaHandler = PulsarSchemaHandlers.newPulsarSchemaHandler(
+                null,
+                null,
                 StringSchema.utf8().getSchemaInfo(),
                 null);
 
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
index c2a3371..9e38e25 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
@@ -33,6 +33,10 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
@@ -51,6 +55,15 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
             log.info("!------ topic %s ------!", entry.getKey());
             setup();
             PulsarRecordCursor pulsarRecordCursor = entry.getValue();
+
+            SchemaHandler schemaHandler = pulsarRecordCursor.getSchemaHandler();
+            PulsarSqlSchemaInfoProvider pulsarSqlSchemaInfoProvider = mock(PulsarSqlSchemaInfoProvider.class);
+            if (schemaHandler instanceof AvroSchemaHandler) {
+                AvroSchemaHandler avroSchemaHandler = (AvroSchemaHandler) schemaHandler;
+                avroSchemaHandler.getSchema().setSchemaInfoProvider(pulsarSqlSchemaInfoProvider);
+                when(pulsarSqlSchemaInfoProvider.getSchemaByVersion(any())).thenReturn(completedFuture(avroSchemaHandler.getSchemaInfo()));
+            }
+
             TopicName topicName = entry.getKey();
 
             int count = 0;