You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ti...@apache.org on 2023/01/08 08:45:44 UTC

[flink-connector-pulsar] branch main updated: [FLINK-25686][Connector/Pulsar] Support schema evolution for Pulsar source and sink. (#12)

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git


The following commit(s) were added to refs/heads/main by this push:
     new 99f4ce4  [FLINK-25686][Connector/Pulsar] Support schema evolution for Pulsar source and sink. (#12)
99f4ce4 is described below

commit 99f4ce47796225ebb1b0be33b59af85c1ae0424a
Author: Yufan Sheng <sy...@gmail.com>
AuthorDate: Sun Jan 8 16:45:39 2023 +0800

    [FLINK-25686][Connector/Pulsar] Support schema evolution for Pulsar source and sink. (#12)
---
 .../docs/connectors/datastream/pulsar.md           |   2 +-
 docs/content/docs/connectors/datastream/pulsar.md  |   2 +
 .../generated/pulsar_sink_configuration.html       |   2 +-
 .../generated/pulsar_source_configuration.html     |   6 +
 flink-connector-pulsar/pom.xml                     |  16 ++
 .../pulsar/common/schema/BytesSchema.java          |  82 +++++++
 .../common/schema/PulsarSchemaTypeSerializer.java  |   5 +
 .../common/schema/factories/AvroSchemaFactory.java |  13 +
 .../common/schema/factories/JSONSchemaFactory.java |  13 +
 .../schema/factories/KeyValueSchemaFactory.java    |   7 +-
 .../connector/pulsar/sink/PulsarSinkBuilder.java   |  65 ++++-
 .../connector/pulsar/sink/PulsarSinkOptions.java   |   6 +-
 .../writer/serializer/PulsarSchemaWrapper.java     |  14 ++
 .../serializer/PulsarSerializationSchema.java      |  25 +-
 .../pulsar/source/PulsarSourceBuilder.java         | 108 +++++++--
 .../pulsar/source/PulsarSourceOptions.java         |  14 ++
 .../pulsar/source/config/SourceConfiguration.java  |  14 ++
 .../source/reader/PulsarPartitionSplitReader.java  |   5 +-
 .../pulsar/source/reader/PulsarSourceReader.java   |  16 ++
 .../deserializer/PulsarDeserializationSchema.java  |  28 ++-
 .../PulsarDeserializationSchemaWrapper.java        |   2 +-
 .../reader/deserializer/PulsarSchemaWrapper.java   |  22 +-
 .../schema/factories/JSONSchemaFactoryTest.java    |   4 +-
 .../pulsar/sink/PulsarSinkBuilderTest.java         |   5 +-
 .../connector/pulsar/sink/PulsarSinkITCase.java    |   3 +-
 .../pulsar/sink/writer/PulsarWriterTest.java       |   4 +-
 .../pulsar/source/PulsarSourceBuilderTest.java     |   3 +-
 .../source/enumerator/cursor/StopCursorTest.java   |   1 +
 .../reader/PulsarPartitionSplitReaderTest.java     |   3 +
 .../source/reader/PulsarSourceReaderTest.java      |   5 +-
 .../PulsarDeserializationSchemaTest.java           | 268 ++++++++++++++++++++-
 .../testutils/sink/PulsarSinkTestContext.java      |   3 +-
 .../testutils/source/PulsarSourceTestContext.java  |   3 +-
 pom.xml                                            |  21 ++
 34 files changed, 721 insertions(+), 69 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/pulsar.md b/docs/content.zh/docs/connectors/datastream/pulsar.md
index a89ade4..31131ab 100644
--- a/docs/content.zh/docs/connectors/datastream/pulsar.md
+++ b/docs/content.zh/docs/connectors/datastream/pulsar.md
@@ -190,7 +190,7 @@ Pulsar Source 提供了两种订阅 Topic 或 Topic 分区的方式。
 
 如果用户只关心消息体的二进制字节流,并不需要其他属性来解析数据。可以直接使用预定义的 `PulsarDeserializationSchema`。Pulsar Source里面提供了 3 种预定义的反序列化器。
 
-- 使用 Pulsar 的 [Schema](https://pulsar.apache.org/docs/zh-CN/schema-understand/) 解析消息。
+- 使用 Pulsar 的 [Schema](https://pulsar.apache.org/docs/zh-CN/schema-understand/) 解析消息。如果使用 KeyValue 或者 Struct 类型的 Schema, 那么 Pulsar 的 `Schema` 将不会含有类型类信息, 但 `PulsarSchemaTypeInformation` 需要通过传入类型类信息来构造。因此我们提供的 API 支持用户传入类型信息。
   ```java
   // 基础数据类型
   PulsarDeserializationSchema.pulsarSchema(Schema);
diff --git a/docs/content/docs/connectors/datastream/pulsar.md b/docs/content/docs/connectors/datastream/pulsar.md
index c1fead3..6933227 100644
--- a/docs/content/docs/connectors/datastream/pulsar.md
+++ b/docs/content/docs/connectors/datastream/pulsar.md
@@ -213,6 +213,8 @@ If only the raw payload of a message (message data in bytes) is needed,
 you can use the predefined `PulsarDeserializationSchema`. Pulsar connector provides three implementation methods.
 
 - Decode the message by using Pulsar's [Schema](https://pulsar.apache.org/docs/en/schema-understand/).
+  If using KeyValue type or Struct types, the pulsar `Schema` does not contain type class info. But it is
+  still needed to construct `PulsarSchemaTypeInformation`. So we provide two more APIs to pass the type info.
   ```java
   // Primitive types
   PulsarDeserializationSchema.pulsarSchema(Schema);
diff --git a/docs/layouts/shortcodes/generated/pulsar_sink_configuration.html b/docs/layouts/shortcodes/generated/pulsar_sink_configuration.html
index f207e5f..a40d3c0 100644
--- a/docs/layouts/shortcodes/generated/pulsar_sink_configuration.html
+++ b/docs/layouts/shortcodes/generated/pulsar_sink_configuration.html
@@ -24,7 +24,7 @@
             <td><h5>pulsar.sink.enableSchemaEvolution</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Boolean</td>
-            <td>If you enable this option and use PulsarSerializationSchema.pulsarSchema(), we would consume and deserialize the message by using Pulsar's <code class="highlighter-rouge">Schema</code>.</td>
+            <td>If you enable this option and use <code class="highlighter-rouge">PulsarSinkBuilder.setSerializationSchema(Schema)</code>, we would produce and serialize the message by using Pulsar's <code class="highlighter-rouge">Schema</code>.</td>
         </tr>
         <tr>
             <td><h5>pulsar.sink.maxRecommitTimes</h5></td>
diff --git a/docs/layouts/shortcodes/generated/pulsar_source_configuration.html b/docs/layouts/shortcodes/generated/pulsar_source_configuration.html
index c66423d..925c1be 100644
--- a/docs/layouts/shortcodes/generated/pulsar_source_configuration.html
+++ b/docs/layouts/shortcodes/generated/pulsar_source_configuration.html
@@ -26,6 +26,12 @@
             <td>Boolean</td>
             <td>Flink commits the consuming position with pulsar transactions on checkpoint. However, if you have disabled the Flink checkpoint or disabled transaction for your Pulsar cluster, ensure that you have set this option to <code class="highlighter-rouge">true</code>.<br />The source would use pulsar client's internal mechanism and commit cursor in two ways.<ul><li>For <code class="highlighter-rouge">Key_Shared</code> and <code class="highlighter-rouge">Shared</code> subscriptio [...]
         </tr>
+        <tr>
+            <td><h5>pulsar.source.enableSchemaEvolution</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If you enable this option and use <code class="highlighter-rouge">PulsarSourceBuilder.setDeserializationSchema(Schema)</code>, we would consume and deserialize the message by using Pulsar's <code class="highlighter-rouge">Schema</code> interface with extra schema evolution check.</td>
+        </tr>
         <tr>
             <td><h5>pulsar.source.enableMetrics</h5></td>
             <td style="word-wrap: break-word;">true</td>
diff --git a/flink-connector-pulsar/pom.xml b/flink-connector-pulsar/pom.xml
index e7fcfe2..ff36209 100644
--- a/flink-connector-pulsar/pom.xml
+++ b/flink-connector-pulsar/pom.xml
@@ -51,6 +51,22 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 
+		<!-- Formats -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-avro</artifactId>
+			<scope>provided</scope>
+			<optional>true</optional>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-json</artifactId>
+			<scope>provided</scope>
+			<optional>true</optional>
+		</dependency>
+
 		<!-- Protobuf & Protobuf Native Schema support. Add it to your pom if you need protobuf -->
 
 		<dependency>
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/BytesSchema.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/BytesSchema.java
new file mode 100644
index 0000000..3bd4903
--- /dev/null
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/BytesSchema.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.schema;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+import static org.apache.pulsar.client.internal.PulsarClientImplementationBinding.getBytes;
+
+/**
+ * This schema is a wrapper for the original schema. It will send the schema info to Pulsar for
+ * compatibility check. And didn't deserialize messages.
+ */
+public class BytesSchema implements Schema<byte[]>, Serializable {
+    private static final long serialVersionUID = -539752264675729127L;
+
+    private final PulsarSchema<?> schema;
+
+    public BytesSchema(PulsarSchema<?> schema) {
+        this.schema = schema;
+    }
+
+    @Override
+    public void validate(byte[] message) {
+        schema.getPulsarSchema().validate(message);
+    }
+
+    @Override
+    public byte[] encode(byte[] message) {
+        return message;
+    }
+
+    @Override
+    public boolean supportSchemaVersioning() {
+        return schema.getPulsarSchema().supportSchemaVersioning();
+    }
+
+    @Override
+    public byte[] decode(byte[] bytes) {
+        return bytes;
+    }
+
+    @Override
+    public byte[] decode(byte[] bytes, byte[] schemaVersion) {
+        // None of Pulsar's schema implementations have implemented this method.
+        return bytes;
+    }
+
+    @Override
+    public byte[] decode(ByteBuffer data, byte[] schemaVersion) {
+        return getBytes(data);
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        return schema.getSchemaInfo();
+    }
+
+    @Override
+    public Schema<byte[]> clone() {
+        return this;
+    }
+}
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTypeSerializer.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTypeSerializer.java
index 3c58c83..ef5c9f0 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTypeSerializer.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTypeSerializer.java
@@ -158,6 +158,11 @@ public class PulsarSchemaTypeSerializer<T> extends TypeSerializer<T> {
 
         private PulsarSchema<T> schema;
 
+        public PulsarSchemaTypeSerializerSnapshot() {
+            // Preserved for serialization in Flink.
+            // See TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass
+        }
+
         public PulsarSchemaTypeSerializerSnapshot(PulsarSchema<T> schema) {
             this.schema = schema;
         }
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/factories/AvroSchemaFactory.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/factories/AvroSchemaFactory.java
index ab97b41..8288136 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/factories/AvroSchemaFactory.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/factories/AvroSchemaFactory.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.connector.pulsar.common.schema.factories;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.AvroUtils;
+
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
@@ -45,4 +48,14 @@ public class AvroSchemaFactory<T> extends BaseStructSchemaFactory<T> {
 
         return AvroSchema.of(definition);
     }
+
+    @Override
+    public TypeInformation<T> createTypeInfo(SchemaInfo info) {
+        try {
+            Class<T> decodeClassInfo = decodeClassInfo(info);
+            return AvroUtils.getAvroUtils().createAvroTypeInfo(decodeClassInfo);
+        } catch (Exception e) {
+            return super.createTypeInfo(info);
+        }
+    }
 }
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/factories/JSONSchemaFactory.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/factories/JSONSchemaFactory.java
index 8d1a494..2280d35 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/factories/JSONSchemaFactory.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/factories/JSONSchemaFactory.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.connector.pulsar.common.schema.factories;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.AvroUtils;
+
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -38,4 +41,14 @@ public class JSONSchemaFactory<T> extends BaseStructSchemaFactory<T> {
         Class<T> typeClass = decodeClassInfo(info);
         return JSONSchema.of(typeClass, info.getProperties());
     }
+
+    @Override
+    public TypeInformation<T> createTypeInfo(SchemaInfo info) {
+        try {
+            Class<T> decodeClassInfo = decodeClassInfo(info);
+            return AvroUtils.getAvroUtils().createAvroTypeInfo(decodeClassInfo);
+        } catch (Exception e) {
+            return super.createTypeInfo(info);
+        }
+    }
 }
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/factories/KeyValueSchemaFactory.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/factories/KeyValueSchemaFactory.java
index 893e7e6..d60f06d 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/factories/KeyValueSchemaFactory.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/factories/KeyValueSchemaFactory.java
@@ -69,11 +69,8 @@ public class KeyValueSchemaFactory<K, V> implements PulsarSchemaFactory<KeyValue
     public TypeInformation<KeyValue<K, V>> createTypeInfo(SchemaInfo info) {
         KeyValue<SchemaInfo, SchemaInfo> kvSchemaInfo = decodeKeyValueSchemaInfo(info);
 
-        Schema<K> keySchema = PulsarSchemaUtils.createSchema(kvSchemaInfo.getKey());
-        Class<K> keyClass = decodeClassInfo(keySchema.getSchemaInfo());
-
-        Schema<V> valueSchema = PulsarSchemaUtils.createSchema(kvSchemaInfo.getValue());
-        Class<V> valueClass = decodeClassInfo(valueSchema.getSchemaInfo());
+        Class<K> keyClass = decodeClassInfo(kvSchemaInfo.getKey());
+        Class<V> valueClass = decodeClassInfo(kvSchemaInfo.getValue());
 
         Schema<KeyValue<K, V>> schema = createSchema(info);
         PulsarSchema<KeyValue<K, V>> pulsarSchema =
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
index 4e8f9b2..1bdb053 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connector.pulsar.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.DeliveryGuarantee;
@@ -30,9 +31,11 @@ import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
 import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
 import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSchemaWrapper;
 import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchemaWrapper;
 import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
 
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.KeyValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -212,6 +215,54 @@ public class PulsarSinkBuilder<IN> {
         return this;
     }
 
+    /**
+     * Send messages to Pulsar by using the flink's {@link SerializationSchema}. It would serialize
+     * the message into a byte array and send it to Pulsar with {@link Schema#BYTES}.
+     */
+    public <T extends IN> PulsarSinkBuilder<T> setSerializationSchema(
+            SerializationSchema<T> serializationSchema) {
+        return setSerializationSchema(new PulsarSerializationSchemaWrapper<>(serializationSchema));
+    }
+
+    /**
+     * Send messages to Pulsar by using the Pulsar {@link Schema} instance. It would serialize the
+     * message into a byte array and send it to Pulsar with {@link Schema#BYTES}. You can directly
+     * use the Schema you provided by enabling the {@link #enableSchemaEvolution()}.
+     *
+     * <p>We only support <a
+     * href="https://pulsar.apache.org/docs/en/schema-understand/#primitive-type">primitive
+     * types</a> here.
+     */
+    public <T extends IN> PulsarSinkBuilder<T> setSerializationSchema(Schema<T> schema) {
+        return setSerializationSchema(new PulsarSchemaWrapper<>(schema));
+    }
+
+    /**
+     * Send messages to Pulsar by using the Pulsar {@link Schema} instance. It would serialize the
+     * message into a byte array and send it to Pulsar with {@link Schema#BYTES}. You can directly
+     * use the Schema you provided by enabling the {@link #enableSchemaEvolution()}.
+     *
+     * <p>We only support <a
+     * href="https://pulsar.apache.org/docs/en/schema-understand/#struct">struct types</a> here.
+     */
+    public <T extends IN> PulsarSinkBuilder<T> setSerializationSchema(
+            Schema<T> schema, Class<T> typeClass) {
+        return setSerializationSchema(new PulsarSchemaWrapper<>(schema, typeClass));
+    }
+
+    /**
+     * Send messages to Pulsar by using the Pulsar {@link Schema} instance. It would serialize the
+     * message into a byte array and send it to Pulsar with {@link Schema#BYTES}. You can directly
+     * use the Schema you provided by enabling the {@link #enableSchemaEvolution()}.
+     *
+     * <p>We only support <a
+     * href="https://pulsar.apache.org/docs/en/schema-understand/#keyvalue">keyvalue types</a> here.
+     */
+    public <K, V, T extends IN> PulsarSinkBuilder<T> setSerializationSchema(
+            Schema<KeyValue<K, V>> schema, Class<K> keyClass, Class<V> valueClass) {
+        return setSerializationSchema(new PulsarSchemaWrapper<>(schema, keyClass, valueClass));
+    }
+
     /**
      * Sets the {@link PulsarSerializationSchema} that transforms incoming records to bytes.
      *
@@ -356,12 +407,16 @@ public class PulsarSinkBuilder<IN> {
         }
 
         checkNotNull(serializationSchema, "serializationSchema must be set.");
-        if (serializationSchema instanceof PulsarSchemaWrapper
-                && !Boolean.TRUE.equals(configBuilder.get(PULSAR_WRITE_SCHEMA_EVOLUTION))) {
+        // Schema evolution validation.
+        if (Boolean.TRUE.equals(configBuilder.get(PULSAR_WRITE_SCHEMA_EVOLUTION))) {
+            checkState(
+                    serializationSchema instanceof PulsarSchemaWrapper,
+                    "When enabling schema evolution, you must provide a Pulsar Schema in builder's setSerializationSchema method.");
+        } else if (serializationSchema instanceof PulsarSchemaWrapper) {
             LOG.info(
-                    "It seems like you want to send message in Pulsar Schema."
-                            + " You can enableSchemaEvolution for using this feature."
-                            + " We would use Schema.BYTES as the default schema if you don't enable this option.");
+                    "It seems like you are sending messages by using Pulsar Schema."
+                            + " You can builder.enableSchemaEvolution() to enable schema evolution for better Pulsar Schema check."
+                            + " We would use bypass Schema check by default.");
         }
 
         // Topic metadata listener validation.
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
index 39b5f73..e1218d4 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
@@ -115,8 +115,10 @@ public final class PulsarSinkOptions {
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "If you enable this option and use PulsarSerializationSchema.pulsarSchema(),"
-                                                    + " we would consume and deserialize the message by using Pulsar's %s.",
+                                            "If you enable this option and use %s,"
+                                                    + " we would produce and serialize the message by using Pulsar's %s.",
+                                            code(
+                                                    "PulsarSinkBuilder.setSerializationSchema(Schema)"),
                                             code("Schema"))
                                     .build());
 
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSchemaWrapper.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSchemaWrapper.java
index 0d5aaf0..0d508e5 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSchemaWrapper.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSchemaWrapper.java
@@ -25,6 +25,7 @@ import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessage;
 import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessageBuilder;
 
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.KeyValue;
 
 /**
  * Wrap the Pulsar's Schema into PulsarSerializationSchema. We support schema evolution out of box
@@ -36,6 +37,19 @@ public class PulsarSchemaWrapper<IN> implements PulsarSerializationSchema<IN> {
 
     private final PulsarSchema<IN> pulsarSchema;
 
+    public PulsarSchemaWrapper(Schema<IN> schema) {
+        this(new PulsarSchema<>(schema));
+    }
+
+    public PulsarSchemaWrapper(Schema<IN> schema, Class<IN> clazz) {
+        this(new PulsarSchema<>(schema, clazz));
+    }
+
+    public <K, V> PulsarSchemaWrapper(
+            Schema<KeyValue<K, V>> schema, Class<K> keyClass, Class<V> valueClass) {
+        this(new PulsarSchema<>(schema, keyClass, valueClass));
+    }
+
     public PulsarSchemaWrapper(PulsarSchema<IN> pulsarSchema) {
         this.pulsarSchema = pulsarSchema;
     }
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSerializationSchema.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSerializationSchema.java
index da7f706..45e3279 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSerializationSchema.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSerializationSchema.java
@@ -21,7 +21,6 @@ package org.apache.flink.connector.pulsar.sink.writer.serializer;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
-import org.apache.flink.connector.pulsar.common.schema.PulsarSchema;
 import org.apache.flink.connector.pulsar.sink.PulsarSinkBuilder;
 import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
 import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
@@ -76,7 +75,11 @@ public interface PulsarSerializationSchema<IN> extends Serializable {
     /**
      * Create a PulsarSerializationSchema by using the flink's {@link SerializationSchema}. It would
      * serialize the message into byte array and send it to Pulsar with {@link Schema#BYTES}.
+     *
+     * @deprecated Use {@link PulsarSinkBuilder#setSerializationSchema(SerializationSchema)}
+     *     instead.
      */
+    @Deprecated
     static <T> PulsarSerializationSchema<T> flinkSchema(
             SerializationSchema<T> serializationSchema) {
         return new PulsarSerializationSchemaWrapper<>(serializationSchema);
@@ -91,10 +94,12 @@ public interface PulsarSerializationSchema<IN> extends Serializable {
      * <p>We only support <a
      * href="https://pulsar.apache.org/docs/en/schema-understand/#primitive-type">primitive
      * types</a> here.
+     *
+     * @deprecated Use {@link PulsarSinkBuilder#setSerializationSchema(Schema)} instead.
      */
+    @Deprecated
     static <T> PulsarSerializationSchema<T> pulsarSchema(Schema<T> schema) {
-        PulsarSchema<T> pulsarSchema = new PulsarSchema<>(schema);
-        return new PulsarSchemaWrapper<>(pulsarSchema);
+        return new PulsarSchemaWrapper<>(schema);
     }
 
     /**
@@ -105,10 +110,12 @@ public interface PulsarSerializationSchema<IN> extends Serializable {
      *
      * <p>We only support <a
      * href="https://pulsar.apache.org/docs/en/schema-understand/#struct">struct types</a> here.
+     *
+     * @deprecated Use {@link PulsarSinkBuilder#setSerializationSchema(Schema, Class)} instead.
      */
+    @Deprecated
     static <T> PulsarSerializationSchema<T> pulsarSchema(Schema<T> schema, Class<T> typeClass) {
-        PulsarSchema<T> pulsarSchema = new PulsarSchema<>(schema, typeClass);
-        return new PulsarSchemaWrapper<>(pulsarSchema);
+        return new PulsarSchemaWrapper<>(schema, typeClass);
     }
 
     /**
@@ -119,11 +126,13 @@ public interface PulsarSerializationSchema<IN> extends Serializable {
      *
      * <p>We only support <a
      * href="https://pulsar.apache.org/docs/en/schema-understand/#keyvalue">keyvalue types</a> here.
+     *
+     * @deprecated Use {@link PulsarSinkBuilder#setSerializationSchema(Schema, Class, Class)}
+     *     instead.
      */
+    @Deprecated
     static <K, V> PulsarSerializationSchema<KeyValue<K, V>> pulsarSchema(
             Schema<KeyValue<K, V>> schema, Class<K> keyClass, Class<V> valueClass) {
-        PulsarSchema<KeyValue<K, V>> pulsarSchema =
-                new PulsarSchema<>(schema, keyClass, valueClass);
-        return new PulsarSchemaWrapper<>(pulsarSchema);
+        return new PulsarSchemaWrapper<>(schema, keyClass, valueClass);
     }
 }
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
index 5cd59b5..5fb9d0b 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
@@ -19,6 +19,8 @@
 package org.apache.flink.connector.pulsar.source;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.configuration.ConfigOption;
@@ -34,10 +36,14 @@ import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
 import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchemaWrapper;
+import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarSchemaWrapper;
+import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarTypeInformationWrapper;
 
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.schema.KeyValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +60,7 @@ import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULS
 import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CONSUMER_NAME;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_READ_SCHEMA_EVOLUTION;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
 import static org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.SOURCE_CONFIG_VALIDATOR;
 import static org.apache.flink.util.InstantiationUtil.isSerializable;
@@ -163,7 +170,7 @@ public final class PulsarSourceBuilder<OUT> {
     }
 
     /**
-     * {@link SubscriptionType} is the consuming behavior for pulsar, we would generator different
+     * {@link SubscriptionType} is the consuming behavior for pulsar, we would generate different
      * split by the given subscription type. Please take some time to consider which subscription
      * type matches your application best. Default is {@link SubscriptionType#Shared}.
      *
@@ -179,12 +186,12 @@ public final class PulsarSourceBuilder<OUT> {
     }
 
     /**
-     * Set a pulsar topic list for flink source. Some topic may not exist currently, consuming this
-     * non-existed topic wouldn't throw any exception. But the best solution is just consuming by
-     * using a topic regex. You can set topics once either with {@link #setTopics} or {@link
+     * Set a pulsar topic list for the flink source. Some topics may not exist currently, consuming
+     * this non-existed topic wouldn't throw any exception. But the best solution is just consuming
+     * by using a topic regex. You can set topics once either with {@link #setTopics} or {@link
      * #setTopicPattern} in this builder.
      *
-     * @param topics The topic list you would like to consume message.
+     * @param topics The topic list you would like to consume messages.
      * @return this PulsarSourceBuilder.
      */
     public PulsarSourceBuilder<OUT> setTopics(String... topics) {
@@ -192,12 +199,12 @@ public final class PulsarSourceBuilder<OUT> {
     }
 
     /**
-     * Set a pulsar topic list for flink source. Some topic may not exist currently, consuming this
-     * non-existed topic wouldn't throw any exception. But the best solution is just consuming by
-     * using a topic regex. You can set topics once either with {@link #setTopics} or {@link
+     * Set a pulsar topic list for the flink source. Some topics may not exist currently, consuming
+     * this non-existed topic wouldn't throw any exception. But the best solution is just consuming
+     * by using a topic regex. You can set topics once either with {@link #setTopics} or {@link
      * #setTopicPattern} in this builder.
      *
-     * @param topics The topic list you would like to consume message.
+     * @param topics The topic list you would like to consume messages.
      * @return this PulsarSourceBuilder.
      */
     public PulsarSourceBuilder<OUT> setTopics(List<String> topics) {
@@ -248,7 +255,7 @@ public final class PulsarSourceBuilder<OUT> {
      *
      * @param topicsPattern the pattern of the topic name to consume from.
      * @param regexSubscriptionMode When subscribing to a topic using a regular expression, you can
-     *     pick a certain type of topics.
+     *     pick a certain type of topic.
      *     <ul>
      *       <li>PersistentOnly: only subscribe to persistent topics.
      *       <li>NonPersistentOnly: only subscribe to non-persistent topics.
@@ -273,6 +280,15 @@ public final class PulsarSourceBuilder<OUT> {
         return setConfig(PULSAR_CONSUMER_NAME, consumerName);
     }
 
+    /**
+     * If you enable this option, we would consume and deserialize the message by using Pulsar
+     * {@link Schema}.
+     */
+    public PulsarSourceBuilder<OUT> enableSchemaEvolution() {
+        configBuilder.set(PULSAR_READ_SCHEMA_EVOLUTION, true);
+        return this;
+    }
+
     /**
      * Set a topic range generator for consuming a sub set of keys.
      *
@@ -345,12 +361,63 @@ public final class PulsarSourceBuilder<OUT> {
     }
 
     /**
-     * DeserializationSchema is required for getting the {@link Schema} for deserialize message from
-     * pulsar and getting the {@link TypeInformation} for message serialization in flink.
+     * Deserialize messages from Pulsar by using the flink's {@link DeserializationSchema}. It would
+     * consume the pulsar message as a byte array and decode the message by using flink's logic.
+     */
+    public <T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema(
+            DeserializationSchema<T> deserializationSchema) {
+        return setDeserializationSchema(
+                new PulsarDeserializationSchemaWrapper<>(deserializationSchema));
+    }
+
+    /**
+     * Deserialize messages from Pulsar by using the Pulsar {@link Schema} instance. It would
+     * consume the pulsar message as a byte array and decode the message by using flink's logic.
+     *
+     * <p>We only support <a
+     * href="https://pulsar.apache.org/docs/en/schema-understand/#primitive-type">primitive
+     * types</a> here.
+     */
+    public <T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema(Schema<T> schema) {
+        return setDeserializationSchema(new PulsarSchemaWrapper<>(schema));
+    }
+
+    /**
+     * Deserialize messages from Pulsar by using the Pulsar {@link Schema} instance. It would
+     * consume the pulsar message as a byte array and decode the message by using flink's logic.
      *
-     * <p>We have defined a set of implementations, using {@code
-     * PulsarDeserializationSchema#pulsarSchema} or {@code PulsarDeserializationSchema#flinkSchema}
-     * for creating the desired schema.
+     * <p>We only support <a
+     * href="https://pulsar.apache.org/docs/en/schema-understand/#struct">struct types</a> here.
+     */
+    public <T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema(
+            Schema<T> schema, Class<T> typeClass) {
+        return setDeserializationSchema(new PulsarSchemaWrapper<>(schema, typeClass));
+    }
+
+    /**
+     * Deserialize messages from Pulsar by using the Pulsar {@link Schema} instance. It would
+     * consume the pulsar message as a byte array and decode the message by using flink's logic.
+     *
+     * <p>We only support <a
+     * href="https://pulsar.apache.org/docs/en/schema-understand/#keyvalue">keyvalue types</a> here.
+     */
+    public <K, V, T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema(
+            Schema<KeyValue<K, V>> schema, Class<K> keyClass, Class<V> valueClass) {
+        return setDeserializationSchema(new PulsarSchemaWrapper<>(schema, keyClass, valueClass));
+    }
+
+    /**
+     * Deserialize messages from Pulsar by using the flink's {@link TypeInformation}. This method is
+     * only used for treating messages that was written into pulsar by {@link TypeInformation}.
+     */
+    public <T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema(
+            TypeInformation<T> information, ExecutionConfig config) {
+        return setDeserializationSchema(new PulsarTypeInformationWrapper<>(information, config));
+    }
+
+    /**
+     * PulsarDeserializationSchema is required for deserializing messages from Pulsar and getting
+     * the {@link TypeInformation} for message serialization in flink.
      */
     public <T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema(
             PulsarDeserializationSchema<T> deserializationSchema) {
@@ -463,6 +530,17 @@ public final class PulsarSourceBuilder<OUT> {
         }
 
         checkNotNull(deserializationSchema, "deserializationSchema should be set.");
+        // Schema evolution validation.
+        if (Boolean.TRUE.equals(configBuilder.get(PULSAR_READ_SCHEMA_EVOLUTION))) {
+            checkState(
+                    deserializationSchema instanceof PulsarSchemaWrapper,
+                    "When enabling schema evolution, you must provide a Pulsar Schema in builder's setDeserializationSchema method.");
+        } else if (deserializationSchema instanceof PulsarSchemaWrapper) {
+            LOG.info(
+                    "It seems like you are consuming messages by using Pulsar Schema."
+                            + " You can builder.enableSchemaEvolution() to enable schema evolution for better Pulsar Schema check."
+                            + " We would use bypass Schema check by default.");
+        }
 
         if (!configBuilder.contains(PULSAR_CONSUMER_NAME)) {
             LOG.warn(
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
index cd61631..4099a52 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
@@ -199,6 +199,20 @@ public final class PulsarSourceOptions {
                                             "In this case, a single consumer will still receive all the keys, but they may be coming in different orders.")
                                     .build());
 
+    public static final ConfigOption<Boolean> PULSAR_READ_SCHEMA_EVOLUTION =
+            ConfigOptions.key(SOURCE_CONFIG_PREFIX + "enableSchemaEvolution")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "If you enable this option and use %s,"
+                                                    + " we would consume and deserialize the message by using Pulsar's %s interface with extra schema evolution check.",
+                                            code(
+                                                    "PulsarSourceBuilder.setDeserializationSchema(Schema)"),
+                                            code("Schema"))
+                                    .build());
+
     public static final ConfigOption<Boolean> PULSAR_ENABLE_SOURCE_METRICS =
             ConfigOptions.key(SOURCE_CONFIG_PREFIX + "enableMetrics")
                     .booleanType()
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
index 63e2813..96e886a 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
@@ -26,6 +26,7 @@ import org.apache.flink.connector.pulsar.source.enumerator.cursor.CursorPosition
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 
 import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 
@@ -41,6 +42,7 @@ import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSA
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_READ_SCHEMA_EVOLUTION;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_MODE;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_VERIFY_INITIAL_OFFSETS;
@@ -60,6 +62,7 @@ public class SourceConfiguration extends PulsarConfiguration {
     private final String subscriptionName;
     private final SubscriptionMode subscriptionMode;
     private final boolean allowKeySharedOutOfOrderDelivery;
+    private final boolean enableSchemaEvolution;
     private final boolean enableMetrics;
 
     public SourceConfiguration(Configuration configuration) {
@@ -75,6 +78,7 @@ public class SourceConfiguration extends PulsarConfiguration {
         this.subscriptionName = get(PULSAR_SUBSCRIPTION_NAME);
         this.subscriptionMode = get(PULSAR_SUBSCRIPTION_MODE);
         this.allowKeySharedOutOfOrderDelivery = get(PULSAR_ALLOW_KEY_SHARED_OUT_OF_ORDER_DELIVERY);
+        this.enableSchemaEvolution = get(PULSAR_READ_SCHEMA_EVOLUTION);
         this.enableMetrics =
                 get(PULSAR_ENABLE_SOURCE_METRICS) && get(PULSAR_STATS_INTERVAL_SECONDS) > 0;
     }
@@ -168,6 +172,14 @@ public class SourceConfiguration extends PulsarConfiguration {
         return allowKeySharedOutOfOrderDelivery;
     }
 
+    /**
+     * If we need to deserialize the message with a specified Pulsar {@link Schema} instead the
+     * default {@link Schema#BYTES}. This switch is only used for {@code PulsarSchemaWrapper}.
+     */
+    public boolean isEnableSchemaEvolution() {
+        return enableSchemaEvolution;
+    }
+
     /** Whether to expose the metrics from Pulsar Consumer. */
     public boolean isEnableMetrics() {
         return enableMetrics;
@@ -199,6 +211,7 @@ public class SourceConfiguration extends PulsarConfiguration {
                 && Objects.equals(subscriptionName, that.subscriptionName)
                 && subscriptionMode == that.subscriptionMode
                 && allowKeySharedOutOfOrderDelivery == that.allowKeySharedOutOfOrderDelivery
+                && enableSchemaEvolution == that.enableSchemaEvolution
                 && enableMetrics == that.enableMetrics;
     }
 
@@ -215,6 +228,7 @@ public class SourceConfiguration extends PulsarConfiguration {
                 subscriptionName,
                 subscriptionMode,
                 allowKeySharedOutOfOrderDelivery,
+                enableSchemaEvolution,
                 enableMetrics);
     }
 }
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
index 9a6873c..664aa7a 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
@@ -94,6 +94,7 @@ public class PulsarPartitionSplitReader
     private final PulsarClient pulsarClient;
     @VisibleForTesting final PulsarAdmin pulsarAdmin;
     @VisibleForTesting final SourceConfiguration sourceConfiguration;
+    private final Schema<byte[]> schema;
     private final SourceReaderMetricGroup metricGroup;
 
     private Consumer<byte[]> pulsarConsumer;
@@ -103,10 +104,12 @@ public class PulsarPartitionSplitReader
             PulsarClient pulsarClient,
             PulsarAdmin pulsarAdmin,
             SourceConfiguration sourceConfiguration,
+            Schema<byte[]> schema,
             SourceReaderMetricGroup metricGroup) {
         this.pulsarClient = pulsarClient;
         this.pulsarAdmin = pulsarAdmin;
         this.sourceConfiguration = sourceConfiguration;
+        this.schema = schema;
         this.metricGroup = metricGroup;
     }
 
@@ -273,7 +276,7 @@ public class PulsarPartitionSplitReader
     /** Create a specified {@link Consumer} by the given topic partition. */
     protected Consumer<byte[]> createPulsarConsumer(TopicPartition partition) {
         ConsumerBuilder<byte[]> consumerBuilder =
-                createConsumerBuilder(pulsarClient, Schema.BYTES, sourceConfiguration);
+                createConsumerBuilder(pulsarClient, schema, sourceConfiguration);
 
         consumerBuilder.topic(partition.getFullTopicName());
 
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java
index 75d3491..6f8ff2c 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java
@@ -26,9 +26,12 @@ import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.SourceReaderBase;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.pulsar.common.schema.BytesSchema;
+import org.apache.flink.connector.pulsar.common.schema.PulsarSchema;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarSchemaWrapper;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
 import org.apache.flink.core.io.InputStatus;
@@ -38,6 +41,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -258,6 +262,17 @@ public class PulsarSourceReader<OUT>
         PulsarClient pulsarClient = createClient(sourceConfiguration);
         PulsarAdmin pulsarAdmin = createAdmin(sourceConfiguration);
 
+        // Choose the right schema bytes to use.
+        Schema<byte[]> schema;
+        if (sourceConfiguration.isEnableSchemaEvolution()) {
+            // Wrap the schema into a byte array schema with extra schema info check.
+            PulsarSchema<?> pulsarSchema =
+                    ((PulsarSchemaWrapper<?>) deserializationSchema).pulsarSchema();
+            schema = new BytesSchema(pulsarSchema);
+        } else {
+            schema = Schema.BYTES;
+        }
+
         // Create an ordered split reader supplier.
         Supplier<SplitReader<Message<byte[]>, PulsarPartitionSplit>> splitReaderSupplier =
                 () ->
@@ -265,6 +280,7 @@ public class PulsarSourceReader<OUT>
                                 pulsarClient,
                                 pulsarAdmin,
                                 sourceConfiguration,
+                                schema,
                                 readerContext.metricGroup());
 
         PulsarSourceFetcherManager fetcherManager =
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchema.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchema.java
index a14f1d4..a72a62d 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchema.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchema.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.serialization.DeserializationSchema.Initializ
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.connector.pulsar.common.schema.PulsarSchema;
+import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import org.apache.flink.util.Collector;
 
@@ -58,11 +59,11 @@ public interface PulsarDeserializationSchema<T> extends Serializable, ResultType
 
     /**
      * Deserializes the pulsar message. This message could be a raw byte message or some parsed
-     * message which decoded by pulsar schema.
+     * message which is decoded by pulsar schema.
      *
-     * <p>You can output multiple message by using the {@link Collector}. Note that number and size
+     * <p>You can output multiple messages by using the {@link Collector}. Note that number and size
      * of the produced records should be relatively small. Depending on the source implementation
-     * records can be buffered in memory or collecting records might delay emitting checkpoint
+     * records can be buffered in memory or collecting records might delay the emitting checkpoint
      * barrier.
      *
      * @param message The message decoded by pulsar.
@@ -72,8 +73,13 @@ public interface PulsarDeserializationSchema<T> extends Serializable, ResultType
 
     /**
      * Create a PulsarDeserializationSchema by using the flink's {@link DeserializationSchema}. It
-     * would consume the pulsar message as byte array and decode the message by using flink's logic.
+     * would consume the pulsar message as the byte array and decode the message by using flink's
+     * logic.
+     *
+     * @deprecated Use {@link PulsarSourceBuilder#setDeserializationSchema(DeserializationSchema)}
+     *     instead.
      */
+    @Deprecated
     static <T> PulsarDeserializationSchema<T> flinkSchema(
             DeserializationSchema<T> deserializationSchema) {
         return new PulsarDeserializationSchemaWrapper<>(deserializationSchema);
@@ -86,7 +92,10 @@ public interface PulsarDeserializationSchema<T> extends Serializable, ResultType
      * <p>We only support <a
      * href="https://pulsar.apache.org/docs/en/schema-understand/#primitive-type">primitive
      * types</a> here.
+     *
+     * @deprecated Use {@link PulsarSourceBuilder#setDeserializationSchema(Schema)} instead.
      */
+    @Deprecated
     static <T> PulsarDeserializationSchema<T> pulsarSchema(Schema<T> schema) {
         PulsarSchema<T> pulsarSchema = new PulsarSchema<>(schema);
         return new PulsarSchemaWrapper<>(pulsarSchema);
@@ -98,7 +107,10 @@ public interface PulsarDeserializationSchema<T> extends Serializable, ResultType
      *
      * <p>We only support <a
      * href="https://pulsar.apache.org/docs/en/schema-understand/#struct">struct types</a> here.
+     *
+     * @deprecated Use {@link PulsarSourceBuilder#setDeserializationSchema(Schema, Class)} instead.
      */
+    @Deprecated
     static <T> PulsarDeserializationSchema<T> pulsarSchema(Schema<T> schema, Class<T> typeClass) {
         PulsarSchema<T> pulsarSchema = new PulsarSchema<>(schema, typeClass);
         return new PulsarSchemaWrapper<>(pulsarSchema);
@@ -110,7 +122,11 @@ public interface PulsarDeserializationSchema<T> extends Serializable, ResultType
      *
      * <p>We only support <a
      * href="https://pulsar.apache.org/docs/en/schema-understand/#keyvalue">keyvalue types</a> here.
+     *
+     * @deprecated Use {@link PulsarSourceBuilder#setDeserializationSchema(Schema, Class, Class)}
+     *     instead.
      */
+    @Deprecated
     static <K, V> PulsarDeserializationSchema<KeyValue<K, V>> pulsarSchema(
             Schema<KeyValue<K, V>> schema, Class<K> keyClass, Class<V> valueClass) {
         PulsarSchema<KeyValue<K, V>> pulsarSchema =
@@ -121,7 +137,11 @@ public interface PulsarDeserializationSchema<T> extends Serializable, ResultType
     /**
      * Create a PulsarDeserializationSchema by using the given {@link TypeInformation}. This method
      * is only used for treating message that was written into pulsar by {@link TypeInformation}.
+     *
+     * @deprecated Use {@link PulsarSourceBuilder#setDeserializationSchema(TypeInformation,
+     *     ExecutionConfig)} instead.
      */
+    @Deprecated
     static <T> PulsarDeserializationSchema<T> flinkTypeInfo(
             TypeInformation<T> information, ExecutionConfig config) {
         return new PulsarTypeInformationWrapper<>(information, config);
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaWrapper.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaWrapper.java
index e9b2779..aac8c96 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaWrapper.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaWrapper.java
@@ -35,7 +35,7 @@ import org.apache.pulsar.client.api.Message;
  * @param <T> The output type of the message.
  */
 @Internal
-class PulsarDeserializationSchemaWrapper<T> implements PulsarDeserializationSchema<T> {
+public class PulsarDeserializationSchemaWrapper<T> implements PulsarDeserializationSchema<T> {
     private static final long serialVersionUID = -630646912412751300L;
 
     private final DeserializationSchema<T> deserializationSchema;
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarSchemaWrapper.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarSchemaWrapper.java
index 7926d80..37553a5 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarSchemaWrapper.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarSchemaWrapper.java
@@ -24,23 +24,37 @@ import org.apache.flink.util.Collector;
 
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
 import static org.apache.flink.connector.pulsar.common.schema.PulsarSchemaUtils.createTypeInformation;
 
 /**
  * The deserialization schema wrapper for pulsar original {@link Schema}. Pulsar would deserialize
- * the message and pass it to flink with a auto generate or given {@link TypeInformation}.
+ * the message and pass it to flink with an auto generated {@link TypeInformation}.
  *
  * @param <T> The output type of the message.
  */
 @Internal
-class PulsarSchemaWrapper<T> implements PulsarDeserializationSchema<T> {
+public class PulsarSchemaWrapper<T> implements PulsarDeserializationSchema<T> {
     private static final long serialVersionUID = -4864701207257059158L;
 
     /** The serializable pulsar schema, it wrap the schema with type class. */
     private final PulsarSchema<T> pulsarSchema;
 
+    public PulsarSchemaWrapper(Schema<T> schema) {
+        this(new PulsarSchema<>(schema));
+    }
+
+    public PulsarSchemaWrapper(Schema<T> schema, Class<T> clazz) {
+        this(new PulsarSchema<>(schema, clazz));
+    }
+
+    public <K, V> PulsarSchemaWrapper(
+            Schema<KeyValue<K, V>> schema, Class<K> keyClass, Class<V> valueClass) {
+        this(new PulsarSchema<>(schema, keyClass, valueClass));
+    }
+
     public PulsarSchemaWrapper(PulsarSchema<T> pulsarSchema) {
         this.pulsarSchema = pulsarSchema;
     }
@@ -59,4 +73,8 @@ class PulsarSchemaWrapper<T> implements PulsarDeserializationSchema<T> {
         SchemaInfo info = pulsarSchema.getSchemaInfo();
         return createTypeInformation(info);
     }
+
+    public PulsarSchema<?> pulsarSchema() {
+        return pulsarSchema;
+    }
 }
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/factories/JSONSchemaFactoryTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/factories/JSONSchemaFactoryTest.java
index e4b2dc1..d67b64c 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/factories/JSONSchemaFactoryTest.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/factories/JSONSchemaFactoryTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.connector.pulsar.common.schema.factories;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.connector.pulsar.common.schema.PulsarSchema;
-import org.apache.flink.connector.pulsar.common.schema.PulsarSchemaTypeInformation;
 import org.apache.flink.connector.pulsar.testutils.SampleData.FL;
 import org.apache.flink.connector.pulsar.testutils.SampleData.Foo;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.apache.pulsar.client.api.Schema;
@@ -62,7 +62,7 @@ class JSONSchemaFactoryTest {
         TypeInformation<FL> typeInfo = factory.createTypeInfo(pulsarSchema.getSchemaInfo());
 
         assertThat(typeInfo)
-                .isInstanceOf(PulsarSchemaTypeInformation.class)
+                .isInstanceOf(AvroTypeInfo.class)
                 .hasFieldOrPropertyWithValue("typeClass", FL.class);
 
         // TypeInformation serialization.
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilderTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilderTest.java
index 0e0db88..6de87b9 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilderTest.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilderTest.java
@@ -29,7 +29,6 @@ import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_SE
 import static org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode.CUSTOM;
 import static org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode.MESSAGE_KEY_HASH;
 import static org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode.ROUND_ROBIN;
-import static org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema.flinkSchema;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -85,7 +84,7 @@ class PulsarSinkBuilderTest {
     @Test
     void emptyTopicShouldHaveCustomTopicRouter() {
         PulsarSinkBuilder<String> builder = PulsarSink.builder();
-        builder.setSerializationSchema(flinkSchema(new SimpleStringSchema()));
+        builder.setSerializationSchema(new SimpleStringSchema());
 
         NullPointerException exception = assertThrows(NullPointerException.class, builder::build);
         assertThat(exception).hasMessage("No topic names or custom topic router are provided.");
@@ -94,7 +93,7 @@ class PulsarSinkBuilderTest {
     @Test
     void serviceUrlAndAdminUrlMustBeProvided() {
         PulsarSinkBuilder<String> builder = PulsarSink.builder();
-        builder.setSerializationSchema(flinkSchema(new SimpleStringSchema()));
+        builder.setSerializationSchema(new SimpleStringSchema());
         builder.setTopics("a", "b");
         assertThrows(IllegalArgumentException.class, builder::build);
 
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
index e73d240..fff75ae 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
@@ -50,7 +50,6 @@ import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
-import static org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema.flinkSchema;
 import static org.apache.pulsar.client.api.Schema.STRING;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -123,7 +122,7 @@ class PulsarSinkITCase {
                             .setAdminUrl(operator().adminUrl())
                             .setDeliveryGuarantee(guarantee)
                             .setTopics(topic)
-                            .setSerializationSchema(flinkSchema(new SimpleStringSchema()))
+                            .setSerializationSchema(new SimpleStringSchema())
                             .build();
 
             StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
index eb766ee..41d3928 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
 import org.apache.flink.connector.pulsar.sink.writer.delayer.FixedMessageDelayer;
 import org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer;
 import org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter;
+import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSchemaWrapper;
 import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
 import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
@@ -54,7 +55,6 @@ import static java.util.Collections.singletonList;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_SCHEMA_EVOLUTION;
-import static org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema.pulsarSchema;
 import static org.apache.pulsar.client.api.Schema.STRING;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -71,7 +71,7 @@ class PulsarWriterTest extends PulsarTestSuiteBase {
         operator().createTopic(topic, 8);
 
         SinkConfiguration configuration = sinkConfiguration(guarantee);
-        PulsarSerializationSchema<String> schema = pulsarSchema(STRING);
+        PulsarSerializationSchema<String> schema = new PulsarSchemaWrapper<>(STRING);
         TopicMetadataListener listener = new TopicMetadataListener(singletonList(topic));
         RoundRobinTopicRouter<String> router = new RoundRobinTopicRouter<>(configuration);
         FixedMessageDelayer<String> delayer = MessageDelayer.never();
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilderTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilderTest.java
index 29d74c4..390ad07 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilderTest.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilderTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.connector.pulsar.source;
 import org.apache.pulsar.client.api.Schema;
 import org.junit.jupiter.api.Test;
 
-import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.jupiter.api.Assertions.assertAll;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -61,6 +60,6 @@ class PulsarSourceBuilderTest {
         builder.setServiceUrl("service-url");
         builder.setSubscriptionName("subscription-name");
         builder.setTopics("topic");
-        builder.setDeserializationSchema(pulsarSchema(Schema.STRING));
+        builder.setDeserializationSchema(Schema.STRING);
     }
 }
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
index cadf019..4ad80d0 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
@@ -58,6 +58,7 @@ class StopCursorTest extends PulsarTestSuiteBase {
                         operator().client(),
                         operator().admin(),
                         sourceConfig(),
+                        Schema.BYTES,
                         createSourceReaderMetricGroup());
         // send the first message and set the stopCursor to filter any late stopCursor
         operator()
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
index cfac1ad..52698ea 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.connector.pulsar.source.reader;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.pulsar.common.schema.BytesSchema;
+import org.apache.flink.connector.pulsar.common.schema.PulsarSchema;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
@@ -269,6 +271,7 @@ class PulsarPartitionSplitReaderTest extends PulsarTestSuiteBase {
                 operator().client(),
                 operator().admin(),
                 sourceConfig(),
+                new BytesSchema(new PulsarSchema<>(STRING)),
                 createSourceReaderMetricGroup());
     }
 
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java
index b9fd6b2..cbad32b 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
 import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
 import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchemaInitializationContext;
+import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarSchemaWrapper;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
 import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
@@ -62,7 +63,6 @@ import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
-import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
 import static org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.createPartitionSplit;
 import static org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.createPartitionSplits;
 import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.DEFAULT_PARTITIONS;
@@ -231,7 +231,8 @@ class PulsarSourceReaderTest extends PulsarTestSuiteBase {
         configuration.set(PULSAR_MAX_FETCH_TIME, 1000L);
         configuration.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
 
-        PulsarDeserializationSchema<Integer> deserializationSchema = pulsarSchema(Schema.INT32);
+        PulsarDeserializationSchema<Integer> deserializationSchema =
+                new PulsarSchemaWrapper<>(Schema.INT32);
         SourceReaderContext context = new TestingReaderContext();
         try {
             deserializationSchema.open(
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
index 18888df..f833b16 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
@@ -18,13 +18,21 @@
 
 package org.apache.flink.connector.pulsar.source.reader.deserializer;
 
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.connector.pulsar.SampleMessage.TestMessage;
+import org.apache.flink.connector.pulsar.source.PulsarSource;
+import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
 import org.apache.flink.connector.testutils.source.deserialization.TestingDeserializationContext;
 import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.types.StringValue;
+import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.function.FunctionWithException;
@@ -33,28 +41,31 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.schema.KeyValue;
 import org.junit.jupiter.api.Test;
 
+import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.util.Objects;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
-import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.flinkSchema;
-import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.flinkTypeInfo;
-import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.apache.pulsar.client.api.Schema.PROTOBUF_NATIVE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.mockito.Mockito.mock;
 
 /** Unit tests for {@link PulsarDeserializationSchema}. */
-class PulsarDeserializationSchemaTest {
+class PulsarDeserializationSchemaTest extends PulsarTestSuiteBase {
 
     @Test
     void createFromFlinkDeserializationSchema() throws Exception {
-        PulsarDeserializationSchema<String> schema = flinkSchema(new SimpleStringSchema());
+        PulsarDeserializationSchema<String> schema =
+                new PulsarDeserializationSchemaWrapper<>(new SimpleStringSchema());
         schema.open(new TestingDeserializationContext(), mock(SourceConfiguration.class));
         assertDoesNotThrow(() -> InstantiationUtil.clone(schema));
 
@@ -69,7 +80,8 @@ class PulsarDeserializationSchemaTest {
     @Test
     void createFromPulsarSchema() throws Exception {
         Schema<TestMessage> schema1 = PROTOBUF_NATIVE(TestMessage.class);
-        PulsarDeserializationSchema<TestMessage> schema2 = pulsarSchema(schema1, TestMessage.class);
+        PulsarDeserializationSchema<TestMessage> schema2 =
+                new PulsarSchemaWrapper<>(schema1, TestMessage.class);
         schema2.open(new TestingDeserializationContext(), mock(SourceConfiguration.class));
         assertDoesNotThrow(() -> InstantiationUtil.clone(schema2));
 
@@ -89,7 +101,8 @@ class PulsarDeserializationSchemaTest {
 
     @Test
     void createFromFlinkTypeInformation() throws Exception {
-        PulsarDeserializationSchema<String> schema = flinkTypeInfo(Types.STRING, null);
+        PulsarDeserializationSchema<String> schema =
+                new PulsarTypeInformationWrapper<>(Types.STRING, null);
         schema.open(new TestingDeserializationContext(), mock(SourceConfiguration.class));
         assertDoesNotThrow(() -> InstantiationUtil.clone(schema));
 
@@ -108,6 +121,247 @@ class PulsarDeserializationSchemaTest {
         assertEquals(collector.result, "test-content");
     }
 
+    @Test
+    void primitiveStringPulsarSchema() {
+        final String topicName =
+                "primitiveString-" + ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE);
+        operator().createTopic(topicName, 1);
+        String expectedMessage = randomAlphabetic(10);
+        operator()
+                .sendMessage(
+                        TopicNameUtils.topicNameWithPartition(topicName, 0),
+                        Schema.STRING,
+                        expectedMessage);
+        PulsarSource<String> source =
+                createSource(topicName, new PulsarSchemaWrapper<>(Schema.STRING));
+        assertThatCode(() -> runPipeline(source, expectedMessage)).doesNotThrowAnyException();
+    }
+
+    @Test
+    void unversionedJsonStructPulsarSchema() {
+        final String topicName =
+                "unversionedJsonStruct-" + ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE);
+        operator().createTopic(topicName, 1);
+        TestingUser expectedMessage = createRandomUser();
+        operator()
+                .sendMessage(
+                        TopicNameUtils.topicNameWithPartition(topicName, 0),
+                        Schema.JSON(TestingUser.class),
+                        expectedMessage);
+        PulsarSource<TestingUser> source =
+                createSource(
+                        topicName,
+                        new PulsarSchemaWrapper<>(
+                                Schema.JSON(TestingUser.class), TestingUser.class));
+        assertThatCode(() -> runPipeline(source, expectedMessage)).doesNotThrowAnyException();
+    }
+
+    @Test
+    void keyValueJsonStructPulsarSchema() {
+        final String topicName =
+                "keyValueJsonStruct-" + ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE);
+        operator().createTopic(topicName, 1);
+        KeyValue<TestingUser, TestingUser> expectedMessage =
+                new KeyValue<>(createRandomUser(), createRandomUser());
+        operator()
+                .sendMessage(
+                        TopicNameUtils.topicNameWithPartition(topicName, 0),
+                        Schema.KeyValue(
+                                Schema.JSON(TestingUser.class), Schema.JSON(TestingUser.class)),
+                        expectedMessage);
+        PulsarSource<KeyValue<TestingUser, TestingUser>> source =
+                createSource(
+                        topicName,
+                        new PulsarSchemaWrapper<>(
+                                Schema.KeyValue(
+                                        Schema.JSON(TestingUser.class),
+                                        Schema.JSON(TestingUser.class)),
+                                TestingUser.class,
+                                TestingUser.class));
+        assertThatCode(() -> runPipeline(source, expectedMessage)).doesNotThrowAnyException();
+    }
+
+    @Test
+    void keyValueAvroStructPulsarSchema() {
+        final String topicName =
+                "keyValueAvroStruct-" + ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE);
+        operator().createTopic(topicName, 1);
+        KeyValue<TestingUser, TestingUser> expectedMessage =
+                new KeyValue<>(createRandomUser(), createRandomUser());
+        operator()
+                .sendMessage(
+                        TopicNameUtils.topicNameWithPartition(topicName, 0),
+                        Schema.KeyValue(
+                                Schema.AVRO(TestingUser.class), Schema.AVRO(TestingUser.class)),
+                        expectedMessage);
+        PulsarSource<KeyValue<TestingUser, TestingUser>> source =
+                createSource(
+                        topicName,
+                        new PulsarSchemaWrapper<>(
+                                Schema.KeyValue(
+                                        Schema.AVRO(TestingUser.class),
+                                        Schema.AVRO(TestingUser.class)),
+                                TestingUser.class,
+                                TestingUser.class));
+        assertThatCode(() -> runPipeline(source, expectedMessage)).doesNotThrowAnyException();
+    }
+
+    @Test
+    void keyValuePrimitivePulsarSchema() {
+        final String topicName =
+                "keyValuePrimitive-" + ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE);
+        operator().createTopic(topicName, 1);
+        KeyValue<String, Integer> expectedMessage = new KeyValue<>(randomAlphabetic(5), 5);
+        operator()
+                .sendMessage(
+                        TopicNameUtils.topicNameWithPartition(topicName, 0),
+                        Schema.KeyValue(Schema.STRING, Schema.INT32),
+                        expectedMessage);
+        PulsarSource<KeyValue<String, Integer>> source =
+                createSource(
+                        topicName,
+                        new PulsarSchemaWrapper<>(
+                                Schema.KeyValue(Schema.STRING, Schema.INT32),
+                                String.class,
+                                Integer.class));
+        assertThatCode(() -> runPipeline(source, expectedMessage)).doesNotThrowAnyException();
+    }
+
+    @Test
+    void keyValuePrimitiveKeyStructValuePulsarSchema() {
+        final String topicName =
+                "primitiveKeyStructValue-"
+                        + ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE);
+        operator().createTopic(topicName, 1);
+        KeyValue<String, TestingUser> expectedMessage =
+                new KeyValue<>(randomAlphabetic(5), createRandomUser());
+        operator()
+                .sendMessage(
+                        TopicNameUtils.topicNameWithPartition(topicName, 0),
+                        Schema.KeyValue(Schema.STRING, Schema.JSON(TestingUser.class)),
+                        expectedMessage);
+        PulsarSource<KeyValue<String, TestingUser>> source =
+                createSource(
+                        topicName,
+                        new PulsarSchemaWrapper<>(
+                                Schema.KeyValue(Schema.STRING, Schema.JSON(TestingUser.class)),
+                                String.class,
+                                TestingUser.class));
+        assertThatCode(() -> runPipeline(source, expectedMessage)).doesNotThrowAnyException();
+    }
+
+    @Test
+    void keyValueStructKeyPrimitiveValuePulsarSchema() {
+        final String topicName =
+                "structKeyPrimitiveValue-"
+                        + ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE);
+        operator().createTopic(topicName, 1);
+        KeyValue<TestingUser, String> expectedMessage =
+                new KeyValue<>(createRandomUser(), randomAlphabetic(5));
+        operator()
+                .sendMessage(
+                        TopicNameUtils.topicNameWithPartition(topicName, 0),
+                        Schema.KeyValue(Schema.JSON(TestingUser.class), Schema.STRING),
+                        expectedMessage);
+        PulsarSource<KeyValue<TestingUser, String>> source =
+                createSource(
+                        topicName,
+                        new PulsarSchemaWrapper<>(
+                                Schema.KeyValue(Schema.JSON(TestingUser.class), Schema.STRING),
+                                TestingUser.class,
+                                String.class));
+        assertThatCode(() -> runPipeline(source, expectedMessage)).doesNotThrowAnyException();
+    }
+
+    @Test
+    void simpleFlinkSchema() {
+        final String topicName =
+                "simpleFlinkSchema-" + ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE);
+        operator().createTopic(topicName, 1);
+        String expectedMessage = randomAlphabetic(5);
+        operator()
+                .sendMessage(
+                        TopicNameUtils.topicNameWithPartition(topicName, 0),
+                        Schema.STRING,
+                        expectedMessage);
+        PulsarSource<String> source =
+                createSource(
+                        topicName,
+                        new PulsarDeserializationSchemaWrapper<>(new SimpleStringSchema()));
+        assertThatCode(() -> runPipeline(source, expectedMessage)).doesNotThrowAnyException();
+    }
+
+    private PulsarSource createSource(
+            String topicName, PulsarDeserializationSchema<?> deserializationSchema) {
+        return PulsarSource.builder()
+                .setDeserializationSchema(deserializationSchema)
+                .setServiceUrl(operator().serviceUrl())
+                .setAdminUrl(operator().adminUrl())
+                .setTopics(topicName)
+                .setSubscriptionName(topicName + "-subscription")
+                .setBoundedStopCursor(StopCursor.latest())
+                .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, -1L)
+                .build();
+    }
+
+    private <T> void runPipeline(PulsarSource<T> source, T expected) throws Exception {
+        try (CloseableIterator<T> iterator =
+                StreamExecutionEnvironment.getExecutionEnvironment()
+                        .setParallelism(1)
+                        .fromSource(source, WatermarkStrategy.noWatermarks(), "testSource")
+                        .executeAndCollect()) {
+            assertThat(iterator).hasNext();
+            assertThat(iterator.next()).isEqualTo(expected);
+        }
+    }
+
+    /** A test POJO class. */
+    public static class TestingUser implements Serializable {
+        private static final long serialVersionUID = -1123545861004770003L;
+        public String name;
+        public Integer age;
+
+        public String getName() {
+            return name;
+        }
+
+        public void setName(String name) {
+            this.name = name;
+        }
+
+        public Integer getAge() {
+            return age;
+        }
+
+        public void setAge(Integer age) {
+            this.age = age;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            TestingUser that = (TestingUser) o;
+            return Objects.equals(name, that.name) && Objects.equals(age, that.age);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(name, age);
+        }
+    }
+
+    private TestingUser createRandomUser() {
+        TestingUser user = new TestingUser();
+        user.setName(randomAlphabetic(5));
+        user.setAge(ThreadLocalRandom.current().nextInt(0, Integer.MAX_VALUE));
+        return user;
+    }
+
     /** Create a test message by given bytes. The message don't contains any meta data. */
     private <T> Message<byte[]> getMessage(
             T message, FunctionWithException<T, byte[], Exception> decoder) throws Exception {
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java
index 6b9cf2d..a697f30 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java
@@ -40,7 +40,6 @@ import java.util.Random;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
 import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES;
-import static org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema.pulsarSchema;
 import static org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.toDeliveryGuarantee;
 
 /** Common sink test context for pulsar based test. */
@@ -74,7 +73,7 @@ public class PulsarSinkTestContext extends PulsarTestContext<String>
                 .setAdminUrl(operator.adminUrl())
                 .setTopics(topicName)
                 .setDeliveryGuarantee(guarantee)
-                .setSerializationSchema(pulsarSchema(schema))
+                .setSerializationSchema(schema)
                 .enableSchemaEvolution()
                 .setConfig(PULSAR_BATCHING_MAX_MESSAGES, 4)
                 .build();
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java
index cb2fc91..e33839c 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java
@@ -40,7 +40,6 @@ import java.util.stream.IntStream;
 import static java.util.stream.Collectors.toList;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
-import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
 import static org.apache.pulsar.client.api.RegexSubscriptionMode.AllTopics;
 
 /**
@@ -61,7 +60,7 @@ public abstract class PulsarSourceTestContext extends PulsarTestContext<String>
     public Source<String, ?, ?> createSource(TestingSourceSettings sourceSettings) {
         PulsarSourceBuilder<String> builder =
                 PulsarSource.builder()
-                        .setDeserializationSchema(pulsarSchema(schema))
+                        .setDeserializationSchema(schema)
                         .setServiceUrl(operator.serviceUrl())
                         .setAdminUrl(operator.adminUrl())
                         .setTopicPattern(topicPattern(), AllTopics)
diff --git a/pom.xml b/pom.xml
index 5e7bc52..a6cbc9f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -74,6 +74,7 @@ under the License.
         <byte-buddy.version>1.12.20</byte-buddy.version>
         <kryo.version>2.24.0</kryo.version>
         <objenesis.version>3.3</objenesis.version>
+        <jackson-bom.version>2.13.4.20221013</jackson-bom.version>
 
         <os-maven-plugin.version>1.7.0</os-maven-plugin.version>
         <protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
@@ -250,6 +251,18 @@ under the License.
                 <version>${flink.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-avro</artifactId>
+                <version>${flink.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-json</artifactId>
+                <version>${flink.version}</version>
+            </dependency>
+
             <!-- Flink ArchUnit -->
 
             <dependency>
@@ -335,6 +348,14 @@ under the License.
                 <version>${archunit.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>com.fasterxml.jackson</groupId>
+                <artifactId>jackson-bom</artifactId>
+                <type>pom</type>
+                <scope>import</scope>
+                <version>${jackson-bom.version}</version>
+            </dependency>
+
             <!-- For dependency convergence -->
 
             <dependency>