You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/02/18 09:20:05 UTC

[incubator-inlong] branch master updated: [INLONG-2524][Feature][Sort] Support deserialization of json, canal and avro formatted data (#2525)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f8d471  [INLONG-2524][Feature][Sort] Support deserialization of json, canal and avro formatted data (#2525)
7f8d471 is described below

commit 7f8d4711e5ff1b1e8b0bc5931fb2e0c082f1facd
Author: Kevin Wen <ke...@gmail.com>
AuthorDate: Fri Feb 18 17:19:58 2022 +0800

    [INLONG-2524][Feature][Sort] Support deserialization of json, canal and avro formatted data (#2525)
---
 inlong-sort/pom.xml                                |   2 +-
 .../inlong/sort/configuration/Constants.java       |   2 -
 .../inlong/sort/protocol/DataFlowStorageInfo.java  |   6 +-
 .../org/apache/inlong/sort/protocol/FieldInfo.java |  12 +-
 .../AvroDeserializationInfo.java}                  |  27 +--
 .../deserialization/CanalDeserializationInfo.java  | 108 +++++++++++
 .../deserialization/CsvDeserializationInfo.java    |   2 +
 .../deserialization/DeserializationInfo.java       |   3 +
 .../JsonDeserializationInfo.java}                  |  27 +--
 .../deserialization/KvDeserializationInfo.java     |   2 +
 .../inlong/sort/protocol/sink/DorisSinkInfo.java   |   2 -
 .../inlong/sort/protocol/sink/HiveSinkInfo.java    |  28 ++-
 .../inlong/sort/protocol/sink/IcebergSinkInfo.java |   2 +
 .../AvroDeserializationInfoTest.java}              |  29 +--
 .../CanalDeserializationInfoTest.java}             |  34 ++--
 .../JsonDeserializationInfoTest.java}              |  29 +--
 .../inlong/sort/formats/base/TableFormatUtils.java |   9 +
 .../inlong/sort/formats/common/Constants.java}     |  24 +--
 .../inlong/sort/formats/common/DateFormatInfo.java |   9 +-
 .../inlong/sort/formats/common/FormatInfo.java     |   3 +-
 .../common/LocalZonedTimestampFormatInfo.java}     |  51 ++---
 .../common/LocalZonedTimestampTypeInfo.java}       |  35 ++--
 .../inlong/sort/formats/common/TimeFormatInfo.java |  11 +-
 .../sort/formats/common/TimestampFormatInfo.java   |   9 +-
 .../inlong/sort/formats/common/TypeInfo.java       |   3 +-
 .../common/LocalZonedTimestampFormatInfoTest.java} |  26 +--
 inlong-sort/sort-single-tenant/pom.xml             |  10 +
 .../inlong/sort/singletenant/flink/Entrance.java   |  68 +++++--
 .../sort/singletenant/flink/SerializedRecord.java  |  58 ++++++
 .../clickhouse/ClickhouseRowSinkFunction.java      |   2 +
 .../CanalDeserializationSchemaBuilder.java         | 137 ++++++++++++++
 ...stomDateFormatDeserializationSchemaWrapper.java | 125 ++++++++++++
 .../deserialization/DeserializationFunction.java   |  66 +++++++
 .../DeserializationSchemaFactory.java              |  96 ++++++++++
 .../flink/deserialization/ListCollector.java}      |  33 ++--
 .../RowDataToRowDeserializationSchemaWrapper.java  |  79 ++++++++
 .../singletenant/flink/kafka/KafkaSinkBuilder.java |  49 +----
 .../flink/pulsar/PulsarSourceBuilder.java          |  78 ++++++++
 ...y.java => CanalSerializationSchemaBuilder.java} |  43 ++---
 ...CustomDateFormatSerializationSchemaWrapper.java |  90 +++++++++
 .../RowToRowDataSerializationSchemaWrapper.java    |  52 +++++
 ...actory.java => SerializationSchemaFactory.java} |  39 ++--
 .../sort/singletenant/flink/utils/CommonUtils.java | 103 +++++++++-
 .../deserialization/AvroDeserializationTest.java   | 102 ++++++++++
 .../deserialization/CanalDeserializationTest.java  | 209 +++++++++++++++++++++
 ...DateFormatDeserializationSchemaWrapperTest.java |  65 +++++++
 .../DeserializationFunctionTest.java               |  53 ++++++
 .../deserialization/JsonDeserializationTest.java   | 112 +++++++++++
 .../flink/kafka/KafkaSinkTestBase.java             |  23 ++-
 .../flink/kafka/KafkaSinkTestBaseForRow.java       |  45 -----
 .../flink/kafka/KafkaSinkTestBaseForRowData.java   |  43 -----
 .../flink/kafka/RowToAvroKafkaSinkTest.java        |   8 +-
 .../flink/kafka/RowToCanalKafkaSinkTest.java       |   9 +-
 .../flink/kafka/RowToJsonKafkaSinkTest.java        |   9 +-
 .../flink/kafka/RowToStringKafkaSinkTest.java      |   9 +-
 ...omDateFormatSerializationSchemaWrapperTest.java |  66 +++++++
 .../singletenant/flink/utils/CommonUtilsTest.java  |  17 ++
 57 files changed, 1963 insertions(+), 430 deletions(-)

diff --git a/inlong-sort/pom.xml b/inlong-sort/pom.xml
index d242dd2..baf2db4 100644
--- a/inlong-sort/pom.xml
+++ b/inlong-sort/pom.xml
@@ -55,7 +55,7 @@
         <tubemq.version>${project.version}</tubemq.version>
         <commons-lang3.version>3.3.2</commons-lang3.version>
         <clickhouse-jdbc.version>0.3.0</clickhouse-jdbc.version>
-        <flink.jackson.version>2.9.8-7.0</flink.jackson.version>
+        <flink.jackson.version>2.12.1-13.0</flink.jackson.version>
         <jsr.version>3.0.2</jsr.version>
         <snappy.version>1.1.4</snappy.version>
         <hadoop.version>2.8.5</hadoop.version>
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
index a0ad42b..c6e8067 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
@@ -57,8 +57,6 @@ public class Constants {
 
     public static final String DESERIALIZATION_SCHEMA_UID = "deserialization_schema_uid";
 
-    public static final String CONVERTER_UID = "converter_uid";
-
     public static final String SINK_UID = "sink_uid";
 
     /**
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/DataFlowStorageInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/DataFlowStorageInfo.java
index 95d99d1..4ef0d63 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/DataFlowStorageInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/DataFlowStorageInfo.java
@@ -18,11 +18,15 @@
 package org.apache.inlong.sort.protocol;
 
 import com.google.common.base.Preconditions;
+
+import java.io.Serializable;
 import java.util.Objects;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
-public class DataFlowStorageInfo {
+public class DataFlowStorageInfo implements Serializable {
+
+    private static final long serialVersionUID = -2785142086976967367L;
 
     public enum StorageType {
         ZK,
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
index 42abe16..f8b1cb9 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
@@ -22,14 +22,18 @@ import org.apache.inlong.sort.formats.common.FormatInfo;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.io.Serializable;
 import java.util.Objects;
 
-public class FieldInfo {
+public class FieldInfo implements Serializable {
+
+    private static final long serialVersionUID = 5871970550803344673L;
+
     @JsonProperty("name")
     private final String name;
 
     @JsonProperty("format_info")
-    private final FormatInfo formatInfo;
+    private FormatInfo formatInfo;
 
     @JsonCreator
     public FieldInfo(
@@ -47,6 +51,10 @@ public class FieldInfo {
         return formatInfo;
     }
 
+    public void setFormatInfo(FormatInfo formatInfo) {
+        this.formatInfo = formatInfo;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/AvroDeserializationInfo.java
similarity index 50%
copy from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java
copy to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/AvroDeserializationInfo.java
index 5ab6c71..2c5f011 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/AvroDeserializationInfo.java
@@ -16,28 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.protocol.sink;
+package org.apache.inlong.sort.protocol.deserialization;
 
-import com.google.common.base.Preconditions;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.inlong.sort.protocol.FieldInfo;
+public class AvroDeserializationInfo implements DeserializationInfo {
 
-public class IcebergSinkInfo extends SinkInfo {
+    private static final long serialVersionUID = -5344203248610337314L;
 
-    @JsonProperty("table_location")
-    private final String tableLocation;
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
 
-    @JsonCreator
-    public IcebergSinkInfo(
-            @JsonProperty("fields") FieldInfo[] fields,
-            @JsonProperty("table_location") String tableLocation) {
-        super(fields);
-        this.tableLocation = Preconditions.checkNotNull(tableLocation);
+        return o != null && getClass() == o.getClass();
     }
 
-    @JsonProperty("table_location")
-    public String getTableLocation() {
-        return tableLocation;
-    }
 }
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CanalDeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CanalDeserializationInfo.java
new file mode 100644
index 0000000..a6ac92c
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CanalDeserializationInfo.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.deserialization;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+public class CanalDeserializationInfo implements DeserializationInfo {
+
+    private static final long serialVersionUID = -5344203248610337314L;
+
+    @JsonProperty("database")
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final String database;
+
+    @JsonProperty("table")
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final String table;
+
+    @JsonProperty("ignore_parse_errors")
+    private final boolean ignoreParseErrors;
+
+    @JsonProperty("timestamp_format_standard")
+    private final String timestampFormatStandard;
+
+    @JsonProperty("include_metadata")
+    private final boolean includeMetadata;
+
+    @JsonCreator
+    public CanalDeserializationInfo(
+            @JsonProperty("database") String database,
+            @JsonProperty("table") String table,
+            @JsonProperty("ignore_parse_errors") boolean ignoreParseErrors,
+            @JsonProperty("timestamp_format_standard") String timestampFormatStandard,
+            @JsonProperty("include_metadata") boolean includeMetadata) {
+        this.database = database;
+        this.table = table;
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.timestampFormatStandard = timestampFormatStandard;
+        this.includeMetadata = includeMetadata;
+    }
+
+    @JsonProperty("database")
+    public String getDatabase() {
+        return database;
+    }
+
+    @JsonProperty("table")
+    public String getTable() {
+        return table;
+    }
+
+    @JsonProperty("ignore_parse_errors")
+    public boolean isIgnoreParseErrors() {
+        return ignoreParseErrors;
+    }
+
+    @JsonProperty("timestamp_format_standard")
+    public String getTimestampFormatStandard() {
+        return timestampFormatStandard;
+    }
+
+    @JsonProperty("include_metadata")
+    public boolean isIncludeMetadata() {
+        return includeMetadata;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        CanalDeserializationInfo that = (CanalDeserializationInfo) o;
+        return ignoreParseErrors == that.ignoreParseErrors
+                && includeMetadata == that.includeMetadata
+                && Objects.equals(database, that.database)
+                && Objects.equals(table, that.table)
+                && Objects.equals(timestampFormatStandard, that.timestampFormatStandard);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(database, table, ignoreParseErrors, timestampFormatStandard, includeMetadata);
+    }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
index fafae28..2c3fc0b 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
@@ -25,6 +25,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
  */
 public class CsvDeserializationInfo implements DeserializationInfo {
 
+    private static final long serialVersionUID = -5035426390567887081L;
+
     private final char splitter;
 
     // TODO: support mapping index to field
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
index 844f506..e1599ff 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
@@ -32,6 +32,9 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
         property = "type")
 @JsonSubTypes({
         @Type(value = CsvDeserializationInfo.class, name = "csv"),
+        @Type(value = AvroDeserializationInfo.class, name = "avro"),
+        @Type(value = JsonDeserializationInfo.class, name = "json"),
+        @Type(value = CanalDeserializationInfo.class, name = "canal"),
         @Type(value = InLongMsgCsvDeserializationInfo.class, name = "inlongmsg_csv"),
         @Type(value = InLongMsgCsv2DeserializationInfo.class, name = "inlongmsg_csv2"),
         @Type(value = InLongMsgKvDeserializationInfo.class, name = "inlongmsg_kv"),
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/JsonDeserializationInfo.java
similarity index 50%
copy from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java
copy to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/JsonDeserializationInfo.java
index 5ab6c71..6c73d9a 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/JsonDeserializationInfo.java
@@ -16,28 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.protocol.sink;
+package org.apache.inlong.sort.protocol.deserialization;
 
-import com.google.common.base.Preconditions;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.inlong.sort.protocol.FieldInfo;
+public class JsonDeserializationInfo implements DeserializationInfo {
 
-public class IcebergSinkInfo extends SinkInfo {
+    private static final long serialVersionUID = -5344203248610337314L;
 
-    @JsonProperty("table_location")
-    private final String tableLocation;
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
 
-    @JsonCreator
-    public IcebergSinkInfo(
-            @JsonProperty("fields") FieldInfo[] fields,
-            @JsonProperty("table_location") String tableLocation) {
-        super(fields);
-        this.tableLocation = Preconditions.checkNotNull(tableLocation);
+        return o != null && getClass() == o.getClass();
     }
 
-    @JsonProperty("table_location")
-    public String getTableLocation() {
-        return tableLocation;
-    }
 }
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
index aadde75..bed2859 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
@@ -27,6 +27,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
  */
 public class KvDeserializationInfo implements DeserializationInfo {
 
+    private static final long serialVersionUID = 1976031542480774581L;
+
     private final char entrySplitter;
 
     private final char kvSplitter;
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/DorisSinkInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/DorisSinkInfo.java
index 0b0d892..d6f8b05 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/DorisSinkInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/DorisSinkInfo.java
@@ -24,8 +24,6 @@ import org.apache.inlong.sort.protocol.FieldInfo;
 
 public class DorisSinkInfo extends SinkInfo {
 
-
-
     private static final long serialVersionUID = 1L;
 
     @JsonProperty("table_identifier")
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/HiveSinkInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/HiveSinkInfo.java
index 89ee544..381a1f1 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/HiveSinkInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/HiveSinkInfo.java
@@ -134,7 +134,10 @@ public class HiveSinkInfo extends SinkInfo {
     @JsonSubTypes({
             @Type(value = HiveTimePartitionInfo.class, name = "time"),
             @Type(value = HiveFieldPartitionInfo.class, name = "field")})
-    public abstract static class HivePartitionInfo {
+    public abstract static class HivePartitionInfo implements Serializable {
+
+        private static final long serialVersionUID = -4276796328049383208L;
+
         @JsonProperty("field_name")
         private final String fieldName;
 
@@ -151,6 +154,8 @@ public class HiveSinkInfo extends SinkInfo {
 
     public static class HiveTimePartitionInfo extends HivePartitionInfo {
 
+        private static final long serialVersionUID = -2475470848828020684L;
+
         @JsonProperty("date_format")
         private final String format;
 
@@ -169,6 +174,8 @@ public class HiveSinkInfo extends SinkInfo {
 
     public static class HiveFieldPartitionInfo extends HivePartitionInfo {
 
+        private static final long serialVersionUID = 9208133177416395986L;
+
         public HiveFieldPartitionInfo(
                 @JsonProperty("field_name") String fieldName) {
             super(fieldName);
@@ -186,10 +193,12 @@ public class HiveSinkInfo extends SinkInfo {
             @Type(value = OrcFileFormat.class, name = "orc"),
             @Type(value = SequenceFileFormat.class, name = "sequence"),
             @Type(value = ParquetFileFormat.class, name = "parquet"),})
-    public interface HiveFileFormat {
+    public interface HiveFileFormat extends Serializable {
     }
 
-    public static class TextFileFormat implements HiveFileFormat, Serializable {
+    public static class TextFileFormat implements HiveFileFormat {
+
+        private static final long serialVersionUID = 522000219325150443L;
 
         @JsonProperty("splitter")
         private final Character splitter;
@@ -221,7 +230,9 @@ public class HiveSinkInfo extends SinkInfo {
         }
     }
 
-    public static class OrcFileFormat implements HiveFileFormat, Serializable {
+    public static class OrcFileFormat implements HiveFileFormat {
+
+        private static final long serialVersionUID = -6483139337919483030L;
 
         @JsonProperty("batch_size")
         private final int batchSize;
@@ -237,7 +248,9 @@ public class HiveSinkInfo extends SinkInfo {
         }
     }
 
-    public static class SequenceFileFormat implements HiveFileFormat, Serializable {
+    public static class SequenceFileFormat implements HiveFileFormat {
+
+        private static final long serialVersionUID = 263836241053911625L;
 
         @JsonProperty("splitter")
         private final Character splitter;
@@ -263,9 +276,12 @@ public class HiveSinkInfo extends SinkInfo {
         }
     }
 
-    public static class ParquetFileFormat implements HiveFileFormat, Serializable {
+    public static class ParquetFileFormat implements HiveFileFormat {
+
+        private static final long serialVersionUID = 3400568099604670179L;
 
         public ParquetFileFormat() {
         }
+
     }
 }
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java
index 5ab6c71..a7d4ff8 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java
@@ -25,6 +25,8 @@ import org.apache.inlong.sort.protocol.FieldInfo;
 
 public class IcebergSinkInfo extends SinkInfo {
 
+    private static final long serialVersionUID = 3682587731889008343L;
+
     @JsonProperty("table_location")
     private final String tableLocation;
 
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/deserialization/AvroDeserializationInfoTest.java
similarity index 50%
copy from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java
copy to inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/deserialization/AvroDeserializationInfoTest.java
index 5ab6c71..6002786 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/deserialization/AvroDeserializationInfoTest.java
@@ -16,28 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.protocol.sink;
+package org.apache.inlong.sort.protocol.deserialization;
 
-import com.google.common.base.Preconditions;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.ProtocolBaseTest;
 
-public class IcebergSinkInfo extends SinkInfo {
+public class AvroDeserializationInfoTest extends ProtocolBaseTest {
 
-    @JsonProperty("table_location")
-    private final String tableLocation;
-
-    @JsonCreator
-    public IcebergSinkInfo(
-            @JsonProperty("fields") FieldInfo[] fields,
-            @JsonProperty("table_location") String tableLocation) {
-        super(fields);
-        this.tableLocation = Preconditions.checkNotNull(tableLocation);
+    @Override
+    public void init() {
+        expectedObject = new AvroDeserializationInfo();
+        expectedJson = "{\"type\":\"avro\"}";
+        equalObj1 = expectedObject;
+        equalObj2 = new AvroDeserializationInfo();
+        unequalObj = "";
     }
 
-    @JsonProperty("table_location")
-    public String getTableLocation() {
-        return tableLocation;
-    }
 }
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/deserialization/CanalDeserializationInfoTest.java
similarity index 50%
copy from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java
copy to inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/deserialization/CanalDeserializationInfoTest.java
index 5ab6c71..613d561 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/deserialization/CanalDeserializationInfoTest.java
@@ -16,28 +16,24 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.protocol.sink;
+package org.apache.inlong.sort.protocol.deserialization;
 
-import com.google.common.base.Preconditions;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.ProtocolBaseTest;
 
-public class IcebergSinkInfo extends SinkInfo {
+public class CanalDeserializationInfoTest extends ProtocolBaseTest {
 
-    @JsonProperty("table_location")
-    private final String tableLocation;
-
-    @JsonCreator
-    public IcebergSinkInfo(
-            @JsonProperty("fields") FieldInfo[] fields,
-            @JsonProperty("table_location") String tableLocation) {
-        super(fields);
-        this.tableLocation = Preconditions.checkNotNull(tableLocation);
+    @Override
+    public void init() {
+        expectedObject = new CanalDeserializationInfo(null, null, false, "SQL", true);
+        expectedJson = "{\n"
+                + "  \"type\" : \"canal\",\n"
+                + "  \"ignore_parse_errors\" : false,\n"
+                + "  \"timestamp_format_standard\" : \"SQL\",\n"
+                + "  \"include_metadata\" : true\n"
+                + "}";
+        equalObj1 = expectedObject;
+        equalObj2 = new CanalDeserializationInfo(null, null, false, "SQL", true);
+        unequalObj = new CanalDeserializationInfo(null, null, false, "SQL", false);
     }
 
-    @JsonProperty("table_location")
-    public String getTableLocation() {
-        return tableLocation;
-    }
 }
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/deserialization/JsonDeserializationInfoTest.java
similarity index 50%
copy from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java
copy to inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/deserialization/JsonDeserializationInfoTest.java
index 5ab6c71..1c13903 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/IcebergSinkInfo.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/deserialization/JsonDeserializationInfoTest.java
@@ -16,28 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.protocol.sink;
+package org.apache.inlong.sort.protocol.deserialization;
 
-import com.google.common.base.Preconditions;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.ProtocolBaseTest;
 
-public class IcebergSinkInfo extends SinkInfo {
+public class JsonDeserializationInfoTest extends ProtocolBaseTest {
 
-    @JsonProperty("table_location")
-    private final String tableLocation;
-
-    @JsonCreator
-    public IcebergSinkInfo(
-            @JsonProperty("fields") FieldInfo[] fields,
-            @JsonProperty("table_location") String tableLocation) {
-        super(fields);
-        this.tableLocation = Preconditions.checkNotNull(tableLocation);
+    @Override
+    public void init() {
+        expectedObject = new JsonDeserializationInfo();
+        expectedJson = "{\"type\":\"json\"}";
+        equalObj1 = expectedObject;
+        equalObj2 = new JsonDeserializationInfo();
+        unequalObj = "";
     }
 
-    @JsonProperty("table_location")
-    public String getTableLocation() {
-        return tableLocation;
-    }
 }
diff --git a/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java b/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
index 40de211..f53121b 100644
--- a/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
+++ b/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
@@ -46,6 +46,7 @@ import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.DoubleType;
 import org.apache.flink.table.types.logical.FloatType;
 import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.NullType;
@@ -77,6 +78,8 @@ import org.apache.inlong.sort.formats.common.FormatInfo;
 import org.apache.inlong.sort.formats.common.FormatUtils;
 import org.apache.inlong.sort.formats.common.IntFormatInfo;
 import org.apache.inlong.sort.formats.common.IntTypeInfo;
+import org.apache.inlong.sort.formats.common.LocalZonedTimestampFormatInfo;
+import org.apache.inlong.sort.formats.common.LocalZonedTimestampTypeInfo;
 import org.apache.inlong.sort.formats.common.LongFormatInfo;
 import org.apache.inlong.sort.formats.common.LongTypeInfo;
 import org.apache.inlong.sort.formats.common.MapFormatInfo;
@@ -276,6 +279,8 @@ public class TableFormatUtils {
             return new TimeFormatInfo();
         } else if (logicalType instanceof TimestampType) {
             return new TimestampFormatInfo();
+        } else if (logicalType instanceof LocalZonedTimestampType) {
+            return new LocalZonedTimestampFormatInfo();
         } else if (logicalType instanceof ArrayType) {
             ArrayType arrayType = (ArrayType) logicalType;
             LogicalType elementType = arrayType.getElementType();
@@ -344,6 +349,8 @@ public class TableFormatUtils {
             return new DateType();
         } else if (formatInfo instanceof TimestampFormatInfo) {
             return new TimestampType(DEFAULT_PRECISION_FOR_TIMESTAMP);
+        } else if (formatInfo instanceof LocalZonedTimestampFormatInfo) {
+            return new LocalZonedTimestampType();
         } else if (formatInfo instanceof ArrayFormatInfo) {
             FormatInfo elementFormatInfo = ((ArrayFormatInfo) formatInfo).getElementFormatInfo();
             return new ArrayType(deriveLogicalType(elementFormatInfo));
@@ -402,6 +409,8 @@ public class TableFormatUtils {
             return Types.SQL_TIME;
         } else if (typeInfo instanceof TimestampTypeInfo) {
             return Types.SQL_TIMESTAMP;
+        } else if (typeInfo instanceof LocalZonedTimestampTypeInfo) {
+            return Types.LOCAL_DATE_TIME;
         } else if (typeInfo instanceof BinaryTypeInfo) {
             return Types.PRIMITIVE_ARRAY(Types.BYTE);
         } else if (typeInfo instanceof ArrayTypeInfo) {
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/Constants.java
similarity index 55%
copy from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
copy to inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/Constants.java
index fafae28..d196ba6 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/Constants.java
@@ -15,28 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.protocol.deserialization;
+package org.apache.inlong.sort.formats.common;
 
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+public class Constants {
 
-/**
- * .
- */
-public class CsvDeserializationInfo implements DeserializationInfo {
-
-    private final char splitter;
-
-    // TODO: support mapping index to field
+    public static final String DATE_AND_TIME_STANDARD_SQL = "SQL";
 
-    @JsonCreator
-    public CsvDeserializationInfo(
-            @JsonProperty("splitter") char splitter) {
-        this.splitter = splitter;
-    }
+    public static final String DATE_AND_TIME_STANDARD_ISO_8601 = "ISO_8601";
 
-    @JsonProperty("splitter")
-    public char getSplitter() {
-        return splitter;
-    }
 }
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/DateFormatInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/DateFormatInfo.java
index 7bb18cf..7b3488f 100644
--- a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/DateFormatInfo.java
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/DateFormatInfo.java
@@ -29,6 +29,9 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import static org.apache.inlong.sort.formats.common.Constants.DATE_AND_TIME_STANDARD_ISO_8601;
+import static org.apache.inlong.sort.formats.common.Constants.DATE_AND_TIME_STANDARD_SQL;
+
 /**
  * The format information for {@link Date}s.
  */
@@ -53,8 +56,10 @@ public class DateFormatInfo implements BasicFormatInfo<Date> {
         this.format = format;
 
         if (!format.equals("SECONDS")
-                    && !format.equals("MILLIS")
-                    && !format.equals("MICROS")) {
+                && !format.equals("MILLIS")
+                && !format.equals("MICROS")
+                && !DATE_AND_TIME_STANDARD_SQL.equals(format)
+                && !DATE_AND_TIME_STANDARD_ISO_8601.equals(format)) {
             this.simpleDateFormat = new SimpleDateFormat(format);
         } else {
             this.simpleDateFormat = null;
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/FormatInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/FormatInfo.java
index 79b8411..4272fa4 100644
--- a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/FormatInfo.java
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/FormatInfo.java
@@ -44,7 +44,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
         @JsonSubTypes.Type(name = "map", value = MapFormatInfo.class),
         @JsonSubTypes.Type(name = "row", value = RowFormatInfo.class),
         @JsonSubTypes.Type(name = "binary", value = BinaryFormatInfo.class),
-        @JsonSubTypes.Type(name = "null", value = NullFormatInfo.class)
+        @JsonSubTypes.Type(name = "null", value = NullFormatInfo.class),
+        @JsonSubTypes.Type(name = "local_zoned_timestamp", value = LocalZonedTimestampFormatInfo.class)
 })
 public interface FormatInfo extends Serializable {
 
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/LocalZonedTimestampFormatInfo.java
similarity index 51%
copy from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
copy to inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/LocalZonedTimestampFormatInfo.java
index 42abe16..9afc8da 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/LocalZonedTimestampFormatInfo.java
@@ -15,36 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.protocol;
-
-import com.google.common.base.Preconditions;
-import org.apache.inlong.sort.formats.common.FormatInfo;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+package org.apache.inlong.sort.formats.common;
 
 import java.util.Objects;
 
-public class FieldInfo {
-    @JsonProperty("name")
-    private final String name;
+public class LocalZonedTimestampFormatInfo implements FormatInfo {
+
+    private static final long serialVersionUID = -7501810151856898046L;
 
-    @JsonProperty("format_info")
-    private final FormatInfo formatInfo;
+    private final String format;
 
-    @JsonCreator
-    public FieldInfo(
-            @JsonProperty("name") String name,
-            @JsonProperty("format_info") FormatInfo formatInfo) {
-        this.name = Preconditions.checkNotNull(name);
-        this.formatInfo = Preconditions.checkNotNull(formatInfo);
+    public LocalZonedTimestampFormatInfo(String format) {
+        this.format = format;
     }
 
-    public String getName() {
-        return name;
+    public LocalZonedTimestampFormatInfo() {
+        this("yyyy-MM-dd HH:mm:ss");
     }
 
-    public FormatInfo getFormatInfo() {
-        return formatInfo;
+    public String getFormat() {
+        return format;
+    }
+
+    @Override
+    public TypeInfo getTypeInfo() {
+        return LocalZonedTimestampTypeInfo.INSTANCE;
     }
 
     @Override
@@ -55,12 +50,20 @@ public class FieldInfo {
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-        FieldInfo fieldInfo = (FieldInfo) o;
-        return name.equals(fieldInfo.name) && formatInfo.equals(fieldInfo.formatInfo);
+        LocalZonedTimestampFormatInfo that = (LocalZonedTimestampFormatInfo) o;
+        return Objects.equals(format, that.format);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(name, formatInfo);
+        return Objects.hash(format);
     }
+
+    @Override
+    public String toString() {
+        return "LocalZonedTimestampFormatInfo{"
+                + "format='" + format + '\''
+                + '}';
+    }
+
 }
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/LocalZonedTimestampTypeInfo.java
similarity index 56%
copy from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
copy to inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/LocalZonedTimestampTypeInfo.java
index fafae28..cdc66f8 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/LocalZonedTimestampTypeInfo.java
@@ -15,28 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.protocol.deserialization;
+package org.apache.inlong.sort.formats.common;
 
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+public class LocalZonedTimestampTypeInfo implements TypeInfo {
 
-/**
- * .
- */
-public class CsvDeserializationInfo implements DeserializationInfo {
+    private static final long serialVersionUID = -837081640968386059L;
+
+    public static final LocalZonedTimestampTypeInfo INSTANCE = new LocalZonedTimestampTypeInfo();
 
-    private final char splitter;
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
 
-    // TODO: support mapping index to field
+        return o != null && getClass() == o.getClass();
+    }
 
-    @JsonCreator
-    public CsvDeserializationInfo(
-            @JsonProperty("splitter") char splitter) {
-        this.splitter = splitter;
+    @Override
+    public int hashCode() {
+        return getClass().hashCode();
     }
 
-    @JsonProperty("splitter")
-    public char getSplitter() {
-        return splitter;
+    @Override
+    public String toString() {
+        return "LocalZonedTimestampTypeInfo";
     }
+
 }
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimeFormatInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimeFormatInfo.java
index f991b32..cd09759 100644
--- a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimeFormatInfo.java
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimeFormatInfo.java
@@ -30,6 +30,9 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import static org.apache.inlong.sort.formats.common.Constants.DATE_AND_TIME_STANDARD_ISO_8601;
+import static org.apache.inlong.sort.formats.common.Constants.DATE_AND_TIME_STANDARD_SQL;
+
 /**
  * The format information for {@link Time}s.
  */
@@ -54,8 +57,10 @@ public class TimeFormatInfo implements BasicFormatInfo<Time> {
         this.format = format;
 
         if (!format.equals("MICROS")
-                    && !format.equals("MILLIS")
-                    && !format.equals("SECONDS")) {
+                && !format.equals("MILLIS")
+                && !format.equals("SECONDS")
+                && !DATE_AND_TIME_STANDARD_SQL.equals(format)
+                && !DATE_AND_TIME_STANDARD_ISO_8601.equals(format)) {
             this.simpleDateFormat = new SimpleDateFormat(format);
         } else {
             this.simpleDateFormat = null;
@@ -63,7 +68,7 @@ public class TimeFormatInfo implements BasicFormatInfo<Time> {
     }
 
     public TimeFormatInfo() {
-        this("hh:mm:ss");
+        this("HH:mm:ss");
     }
 
     @Nonnull
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimestampFormatInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimestampFormatInfo.java
index ac7c307..d26373c 100644
--- a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimestampFormatInfo.java
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimestampFormatInfo.java
@@ -30,6 +30,9 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import static org.apache.inlong.sort.formats.common.Constants.DATE_AND_TIME_STANDARD_ISO_8601;
+import static org.apache.inlong.sort.formats.common.Constants.DATE_AND_TIME_STANDARD_SQL;
+
 /**
  * The format information for {@link Timestamp}s.
  */
@@ -55,7 +58,9 @@ public class TimestampFormatInfo implements BasicFormatInfo<Timestamp> {
 
         if (!format.equals("MICROS")
                 && !format.equals("MILLIS")
-                && !format.equals("SECONDS")) {
+                && !format.equals("SECONDS")
+                && !DATE_AND_TIME_STANDARD_SQL.equals(format)
+                && !DATE_AND_TIME_STANDARD_ISO_8601.equals(format)) {
             this.simpleDateFormat = new SimpleDateFormat(format);
         } else {
             this.simpleDateFormat = null;
@@ -63,7 +68,7 @@ public class TimestampFormatInfo implements BasicFormatInfo<Timestamp> {
     }
 
     public TimestampFormatInfo() {
-        this("yyyy-MM-dd hh:mm:ss");
+        this("yyyy-MM-dd HH:mm:ss");
     }
 
     @Nonnull
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TypeInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TypeInfo.java
index 626f620..2eaef3c 100644
--- a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TypeInfo.java
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TypeInfo.java
@@ -43,7 +43,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
         @JsonSubTypes.Type(name = "map", value = MapTypeInfo.class),
         @JsonSubTypes.Type(name = "row", value = RowTypeInfo.class),
         @JsonSubTypes.Type(name = "binary", value = BinaryTypeInfo.class),
-        @JsonSubTypes.Type(name = "null", value = NullTypeInfo.class)
+        @JsonSubTypes.Type(name = "null", value = NullTypeInfo.class),
+        @JsonSubTypes.Type(name = "local_zoned_timestamp", value = LocalZonedTimestampTypeInfo.class)
 })
 public interface TypeInfo extends Serializable {
 
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java b/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/LocalZonedTimestampFormatInfoTest.java
similarity index 56%
copy from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
copy to inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/LocalZonedTimestampFormatInfoTest.java
index fafae28..4389b03 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
+++ b/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/LocalZonedTimestampFormatInfoTest.java
@@ -15,28 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.protocol.deserialization;
+package org.apache.inlong.sort.formats.common;
 
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Collection;
+import java.util.Collections;
 
-/**
- * .
- */
-public class CsvDeserializationInfo implements DeserializationInfo {
-
-    private final char splitter;
+public class LocalZonedTimestampFormatInfoTest extends FormatInfoTestBase {
 
-    // TODO: support mapping index to field
-
-    @JsonCreator
-    public CsvDeserializationInfo(
-            @JsonProperty("splitter") char splitter) {
-        this.splitter = splitter;
+    @Override
+    Collection<FormatInfo> createFormatInfos() {
+        return Collections.singleton(new LocalZonedTimestampFormatInfo());
     }
 
-    @JsonProperty("splitter")
-    public char getSplitter() {
-        return splitter;
-    }
 }
diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index cb8e925..200e03e 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -120,6 +120,16 @@
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-shaded-jackson</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>inlong-common</artifactId>
+        </dependency>
+
         <!-- test -->
         <dependency>
             <groupId>org.apache.kafka</groupId>
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
index 330cf72..e08900c 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
@@ -18,12 +18,16 @@
 package org.apache.inlong.sort.singletenant.flink;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.inlong.sort.singletenant.flink.kafka.KafkaSinkBuilder.buildKafkaSinkStream;
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.inlong.sort.singletenant.flink.kafka.KafkaSinkBuilder.buildKafkaSink;
+import static org.apache.inlong.sort.singletenant.flink.pulsar.PulsarSourceBuilder.buildPulsarSource;
 
-import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.IOException;
 import java.util.Map;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -43,6 +47,9 @@ import org.apache.inlong.sort.protocol.sink.SinkInfo;
 import org.apache.inlong.sort.protocol.source.PulsarSourceInfo;
 import org.apache.inlong.sort.protocol.source.SourceInfo;
 import org.apache.inlong.sort.singletenant.flink.clickhouse.ClickhouseRowSinkFunction;
+import org.apache.inlong.sort.singletenant.flink.deserialization.DeserializationFunction;
+import org.apache.inlong.sort.singletenant.flink.deserialization.DeserializationSchemaFactory;
+import org.apache.inlong.sort.singletenant.flink.serialization.SerializationSchemaFactory;
 import org.apache.inlong.sort.singletenant.flink.utils.CommonUtils;
 import org.apache.inlong.sort.util.ParameterTool;
 
@@ -62,14 +69,18 @@ public class Entrance {
         env.getCheckpointConfig().setCheckpointTimeout(config.getInteger(Constants.CHECKPOINT_TIMEOUT_MS));
         env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
 
-        DataStream<Row> sourceStream = buildSourceStream(
+        DataStream<SerializedRecord> sourceStream = buildSourceStream(
                 env,
                 config,
-                dataFlowInfo.getSourceInfo()
+                dataFlowInfo.getSourceInfo(),
+                dataFlowInfo.getProperties()
         );
 
+        DataStream<Row> deserializedStream =
+                buildDeserializationStream(sourceStream, dataFlowInfo.getSourceInfo(), config);
+
         buildSinkStream(
-                sourceStream,
+                deserializedStream,
                 config,
                 dataFlowInfo.getSinkInfo(),
                 dataFlowInfo.getProperties(),
@@ -83,24 +94,40 @@ public class Entrance {
         return objectMapper.readValue(new File(fileName), DataFlowInfo.class);
     }
 
-    private static DataStream<Row> buildSourceStream(
+    private static DataStream<SerializedRecord> buildSourceStream(
             StreamExecutionEnvironment env,
             Configuration config,
-            SourceInfo sourceInfo) {
+            SourceInfo sourceInfo,
+            Map<String, Object> properties) {
         final String sourceType = checkNotNull(config.getString(Constants.SOURCE_TYPE));
         final int sourceParallelism = config.getInteger(Constants.SOURCE_PARALLELISM);
 
-        DataStream<Row> sourceStream = null;
         if (sourceType.equals(Constants.SOURCE_TYPE_PULSAR)) {
-            Preconditions.checkState(sourceInfo instanceof PulsarSourceInfo);
+            checkState(sourceInfo instanceof PulsarSourceInfo);
             PulsarSourceInfo pulsarSourceInfo = (PulsarSourceInfo) sourceInfo;
 
-            // TODO : implement pulsar source function
+            return env.addSource(buildPulsarSource(pulsarSourceInfo, config, properties))
+                    .uid(Constants.SOURCE_UID)
+                    .name("Pulsar source")
+                    .setParallelism(sourceParallelism)
+                    .rebalance();
         } else {
             throw new IllegalArgumentException("Unsupported source type " + sourceType);
         }
+    }
 
-        return sourceStream;
+    private static DataStream<Row> buildDeserializationStream(
+            DataStream<SerializedRecord> sourceStream,
+            SourceInfo sourceInfo,
+            Configuration config
+    ) throws IOException, ClassNotFoundException {
+        DeserializationSchema<Row> schema =
+                DeserializationSchemaFactory.build(sourceInfo.getFields(), sourceInfo.getDeserializationInfo());
+        DeserializationFunction function = new DeserializationFunction(schema);
+        return sourceStream.process(function)
+                .uid(Constants.DESERIALIZATION_SCHEMA_UID)
+                .name("Deserialization")
+                .setParallelism(config.getInteger(Constants.DESERIALIZATION_PARALLELISM));
     }
 
     private static void buildSinkStream(
@@ -108,14 +135,14 @@ public class Entrance {
             Configuration config,
             SinkInfo sinkInfo,
             Map<String, Object> properties,
-            long dataflowId) {
+            long dataflowId) throws IOException, ClassNotFoundException {
         final String sinkType = checkNotNull(config.getString(Constants.SINK_TYPE));
         final int sinkParallelism = config.getInteger(Constants.SINK_PARALLELISM);
 
         // TODO : implement sink functions below
         switch (sinkType) {
             case Constants.SINK_TYPE_CLICKHOUSE:
-                Preconditions.checkState(sinkInfo instanceof ClickHouseSinkInfo);
+                checkState(sinkInfo instanceof ClickHouseSinkInfo);
                 ClickHouseSinkInfo clickHouseSinkInfo = (ClickHouseSinkInfo) sinkInfo;
 
                 sourceStream.addSink(new ClickhouseRowSinkFunction(clickHouseSinkInfo))
@@ -124,7 +151,7 @@ public class Entrance {
                         .setParallelism(sinkParallelism);
                 break;
             case Constants.SINK_TYPE_HIVE:
-                Preconditions.checkState(sinkInfo instanceof HiveSinkInfo);
+                checkState(sinkInfo instanceof HiveSinkInfo);
                 HiveSinkInfo hiveSinkInfo = (HiveSinkInfo) sinkInfo;
 
                 if (hiveSinkInfo.getPartitions().length == 0) {
@@ -147,19 +174,26 @@ public class Entrance {
 
                 break;
             case Constants.SINK_TYPE_ICEBERG:
-                Preconditions.checkState(sinkInfo instanceof IcebergSinkInfo);
+                checkState(sinkInfo instanceof IcebergSinkInfo);
                 IcebergSinkInfo icebergSinkInfo = (IcebergSinkInfo) sinkInfo;
                 TableLoader tableLoader = TableLoader.fromHadoopTable(
                         icebergSinkInfo.getTableLocation(),
                         new org.apache.hadoop.conf.Configuration());
 
-                FlinkSink.forRow(sourceStream, CommonUtils.getTableSchema(sinkInfo))
+                FlinkSink.forRow(sourceStream, CommonUtils.getTableSchema(sinkInfo.getFields()))
                         .tableLoader(tableLoader)
                         .writeParallelism(sinkParallelism)
                         .build();
                 break;
             case Constants.SINK_TYPE_KAFKA:
-                buildKafkaSinkStream(sourceStream, (KafkaSinkInfo) sinkInfo, properties, config, sinkParallelism);
+                checkState(sinkInfo instanceof KafkaSinkInfo);
+                SerializationSchema<Row> schema = SerializationSchemaFactory.build(sinkInfo.getFields(),
+                        ((KafkaSinkInfo) sinkInfo).getSerializationInfo());
+                sourceStream
+                        .addSink(buildKafkaSink((KafkaSinkInfo) sinkInfo, properties, schema, config))
+                        .uid(Constants.SINK_UID)
+                        .name("Kafka Sink")
+                        .setParallelism(sinkParallelism);
                 break;
             default:
                 throw new IllegalArgumentException("Unsupported sink type " + sinkType);
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/SerializedRecord.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/SerializedRecord.java
new file mode 100644
index 0000000..057c8e6
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/SerializedRecord.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink;
+
+import java.io.Serializable;
+
+public class SerializedRecord implements Serializable {
+
+    private static final long serialVersionUID = 5418156417016358730L;
+
+    // Event time
+    private long timestampMillis;
+
+    private byte[] data;
+
+    /**
+     * Just satisfy requirement of Flink Pojo definition.
+     */
+    public SerializedRecord() {
+
+    }
+
+    public SerializedRecord(long timestampMillis, byte[] data) {
+        this.timestampMillis = timestampMillis;
+        this.data = data;
+    }
+
+    public void setTimestampMillis(long timestampMillis) {
+        this.timestampMillis = timestampMillis;
+    }
+
+    public void setData(byte[] data) {
+        this.data = data;
+    }
+
+    public long getTimestampMillis() {
+        return timestampMillis;
+    }
+
+    public byte[] getData() {
+        return data;
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/clickhouse/ClickhouseRowSinkFunction.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/clickhouse/ClickhouseRowSinkFunction.java
index da37cda..812bdec 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/clickhouse/ClickhouseRowSinkFunction.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/clickhouse/ClickhouseRowSinkFunction.java
@@ -29,6 +29,8 @@ import org.apache.inlong.sort.protocol.sink.ClickHouseSinkInfo;
 
 public class ClickhouseRowSinkFunction extends RichSinkFunction<Row> implements CheckpointedFunction {
 
+    private static final long serialVersionUID = -2043675961013551232L;
+
     private final ClickHouseSinkFunction clickHouseSinkFunction;
 
     public ClickhouseRowSinkFunction(ClickHouseSinkInfo clickHouseSinkInfo) {
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/CanalDeserializationSchemaBuilder.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/CanalDeserializationSchemaBuilder.java
new file mode 100644
index 0000000..7ca93a5
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/CanalDeserializationSchemaBuilder.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.deserialization;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.canal.CanalJsonDecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.inlong.sort.formats.common.ArrayFormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LocalZonedTimestampFormatInfo;
+import org.apache.inlong.sort.formats.common.MapFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.deserialization.CanalDeserializationInfo;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.table.types.utils.DataTypeUtils.validateInputDataType;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertDateToStringFormatInfo;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToDataType;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.extractFormatInfos;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.getTimestampFormatStandard;
+
+public class CanalDeserializationSchemaBuilder {
+
+    private static final List<String> ALL_SUPPORTED_METADATA_KEYS =
+            Arrays.asList("database", "table", "sql-type", "pk-names", "ingestion-timestamp", "event-timestamp");
+
+    public static DeserializationSchema<Row> build(
+            FieldInfo[] fieldInfos,
+            CanalDeserializationInfo deserializationInfo
+    ) throws IOException, ClassNotFoundException {
+        String timestampFormatStandard = deserializationInfo.getTimestampFormatStandard();
+        boolean includeMetadata = deserializationInfo.isIncludeMetadata();
+        CanalJsonDecodingFormat canalJsonDecodingFormat = createCanalJsonDecodingFormat(
+                deserializationInfo.getDatabase(),
+                deserializationInfo.getTable(),
+                deserializationInfo.isIgnoreParseErrors(),
+                timestampFormatStandard,
+                includeMetadata
+        );
+
+        FieldInfo[] convertedInputFields = convertDateToStringFormatInfo(fieldInfos);
+        DeserializationSchema<RowData> canalSchema = canalJsonDecodingFormat.createRuntimeDecoder(
+                new DynamicTableSource.Context() {
+                    @Override
+                    public <T> TypeInformation<T> createTypeInformation(DataType dataType) {
+                        validateInputDataType(dataType);
+                        return InternalTypeInfo.of(dataType.getLogicalType());
+                    }
+
+                    @Override
+                    public DynamicTableSource.DataStructureConverter createDataStructureConverter(DataType dataType) {
+                        return null;
+                    }
+                },
+                convertFieldInfosToDataType(convertedInputFields)
+        );
+
+        return wrapCanalDeserializationSchema(canalSchema, includeMetadata, fieldInfos, timestampFormatStandard);
+    }
+
+    private static DeserializationSchema<Row> wrapCanalDeserializationSchema(
+            DeserializationSchema<RowData> canalSchema,
+            boolean includeMetadata,
+            FieldInfo[] origFieldInfos,
+            String timestampFormatStandard
+    ) throws IOException, ClassNotFoundException {
+        FieldInfo[] allFields;
+        if (includeMetadata) {
+            allFields = new FieldInfo[origFieldInfos.length + ALL_SUPPORTED_METADATA_KEYS.size()];
+            System.arraycopy(origFieldInfos, 0, allFields, 0, origFieldInfos.length);
+            FieldInfo[] metadataFields = buildMetadataFields(timestampFormatStandard);
+            System.arraycopy(metadataFields, 0, allFields, origFieldInfos.length, metadataFields.length);
+        } else {
+            allFields = origFieldInfos;
+        }
+
+        FieldInfo[] convertedAllFields = convertDateToStringFormatInfo(allFields);
+        RowDataToRowDeserializationSchemaWrapper rowDataToRowSchema =
+                new RowDataToRowDeserializationSchemaWrapper(canalSchema, convertedAllFields);
+        return new CustomDateFormatDeserializationSchemaWrapper(rowDataToRowSchema, extractFormatInfos(allFields));
+    }
+
+    private static CanalJsonDecodingFormat createCanalJsonDecodingFormat(
+            String database,
+            String table,
+            boolean ignoreParseErrors,
+            String timestampFormatStandard,
+            boolean includeMetadata
+    ) {
+        TimestampFormat timestampFormat = getTimestampFormatStandard(timestampFormatStandard);
+        CanalJsonDecodingFormat canalJsonDecodingFormat =
+                new CanalJsonDecodingFormat(database, table, ignoreParseErrors, timestampFormat);
+        if (includeMetadata) {
+            canalJsonDecodingFormat.applyReadableMetadata(ALL_SUPPORTED_METADATA_KEYS);
+        }
+
+        return canalJsonDecodingFormat;
+    }
+
+    private static FieldInfo[] buildMetadataFields(String timestampStandard) {
+        return new FieldInfo[]{
+                new FieldInfo(ALL_SUPPORTED_METADATA_KEYS.get(0), StringFormatInfo.INSTANCE),
+                new FieldInfo(ALL_SUPPORTED_METADATA_KEYS.get(1), StringFormatInfo.INSTANCE),
+                new FieldInfo(ALL_SUPPORTED_METADATA_KEYS.get(2),
+                        new MapFormatInfo(StringFormatInfo.INSTANCE, IntFormatInfo.INSTANCE)),
+                new FieldInfo(ALL_SUPPORTED_METADATA_KEYS.get(3), new ArrayFormatInfo(StringFormatInfo.INSTANCE)),
+                new FieldInfo(ALL_SUPPORTED_METADATA_KEYS.get(4), new LocalZonedTimestampFormatInfo(timestampStandard)),
+                new FieldInfo(ALL_SUPPORTED_METADATA_KEYS.get(5), new LocalZonedTimestampFormatInfo(timestampStandard))
+        };
+    }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/CustomDateFormatDeserializationSchemaWrapper.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/CustomDateFormatDeserializationSchemaWrapper.java
new file mode 100644
index 0000000..c3cc687
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/CustomDateFormatDeserializationSchemaWrapper.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.deserialization;
+
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.shaded.guava18.com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.TimeFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.text.ParseException;
+
+import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkState;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.isStandardTimestampFormat;
+
+public class CustomDateFormatDeserializationSchemaWrapper implements DeserializationSchema<Row> {
+
+    private static final long serialVersionUID = -8124501384364175884L;
+
+    private final DeserializationSchema<Row> innerSchema;
+
+    private final FormatInfo[] formatInfos;
+
+    public CustomDateFormatDeserializationSchemaWrapper(
+            DeserializationSchema<Row> innerSchema, FormatInfo[] formatInfos) {
+        this.innerSchema = checkNotNull(innerSchema);
+        this.formatInfos = checkNotNull(formatInfos);
+    }
+
+    @Override
+    public Row deserialize(byte[] message) {
+        throw new RuntimeException(
+                "Please invoke DeserializationSchema#deserialize(byte[], Collector<RowData>) instead.");
+    }
+
+    @Override
+    public void deserialize(byte[] message, Collector<Row> out) throws IOException {
+        ListCollector<Row> collector = new ListCollector<>();
+        innerSchema.deserialize(message, collector);
+        for (Row row : collector.getInnerList()) {
+            try {
+                out.collect(fromStringToDateAndTime(row));
+            } catch (ParseException e) {
+                throw new IOException("Failed to parse input date or time, error is " + e);
+            }
+        }
+    }
+
+    @Override
+    public boolean isEndOfStream(Row nextElement) {
+        return innerSchema.isEndOfStream(nextElement);
+    }
+
+    @Override
+    public TypeInformation<Row> getProducedType() {
+        return innerSchema.getProducedType();
+    }
+
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        innerSchema.open(context);
+    }
+
+    @VisibleForTesting
+    Row fromStringToDateAndTime(Row inputRow) throws ParseException {
+        int arity = inputRow.getArity();
+        Row outputRow = new Row(arity);
+        for (int i = 0; i < arity; i++) {
+            outputRow.setField(i, convert(inputRow.getField(i), formatInfos[i]));
+        }
+        outputRow.setKind(inputRow.getKind());
+        return outputRow;
+    }
+
+    private Object convert(Object input, FormatInfo formatInfo) throws ParseException {
+
+        if (formatInfo instanceof DateFormatInfo && !isStandardTimestampFormat(formatInfo)) {
+            checkState(input instanceof String);
+            java.util.Date date =
+                    FastDateFormat.getInstance(((DateFormatInfo) formatInfo).getFormat()).parse((String) input);
+            return new Date(date.getTime());
+
+        } else if (formatInfo instanceof TimeFormatInfo && !isStandardTimestampFormat(formatInfo)) {
+            checkState(input instanceof String);
+            java.util.Date date =
+                    FastDateFormat.getInstance(((TimeFormatInfo) formatInfo).getFormat()).parse((String) input);
+            return new Time(date.getTime());
+
+        } else if (formatInfo instanceof TimestampFormatInfo && !isStandardTimestampFormat(formatInfo)) {
+            checkState(input instanceof String);
+            java.util.Date date =
+                    FastDateFormat.getInstance(((TimestampFormatInfo) formatInfo).getFormat()).parse((String) input);
+            return new Timestamp(date.getTime());
+
+        } else {
+            return input;
+        }
+    }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationFunction.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationFunction.java
new file mode 100644
index 0000000..e8bfe2f
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationFunction.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.deserialization;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.inlong.commons.msg.InLongMsg;
+import org.apache.inlong.sort.singletenant.flink.SerializedRecord;
+
+import java.util.Iterator;
+
+public class DeserializationFunction extends ProcessFunction<SerializedRecord, Row> {
+
+    private static final long serialVersionUID = 6245991845787657154L;
+
+    private final DeserializationSchema<Row> deserializationSchema;
+
+    public DeserializationFunction(
+            DeserializationSchema<Row> deserializationSchema) {
+        this.deserializationSchema = deserializationSchema;
+    }
+
+    @Override
+    public void processElement(
+            SerializedRecord value,
+            ProcessFunction<SerializedRecord, Row>.Context ctx,
+            Collector<Row> out
+    ) throws Exception {
+        InLongMsg inLongMsg = InLongMsg.parseFrom(value.getData());
+
+        for (String attr : inLongMsg.getAttrs()) {
+            Iterator<byte[]> iterator = inLongMsg.getIterator(attr);
+            if (iterator == null) {
+                continue;
+            }
+
+            while (iterator.hasNext()) {
+
+                byte[] bodyBytes = iterator.next();
+                if (bodyBytes == null || bodyBytes.length == 0) {
+                    continue;
+                }
+
+                deserializationSchema.deserialize(bodyBytes, out);
+            }
+        }
+    }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationSchemaFactory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationSchemaFactory.java
new file mode 100644
index 0000000..12f8dcb
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationSchemaFactory.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.deserialization;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
+import org.apache.flink.formats.json.JsonRowDeserializationSchema;
+import org.apache.flink.types.Row;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.deserialization.AvroDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.CanalDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.JsonDeserializationInfo;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.buildAvroRecordSchemaInJson;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertDateToStringFormatInfo;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToRowTypeInfo;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.extractFormatInfos;
+
+public class DeserializationSchemaFactory {
+
+    public static DeserializationSchema<Row> build(
+            FieldInfo[] fieldInfos,
+            DeserializationInfo deserializationInfo) throws IOException, ClassNotFoundException {
+        if (deserializationInfo instanceof JsonDeserializationInfo) {
+            return buildJsonDeserializationSchema(fieldInfos);
+        } else if (deserializationInfo instanceof AvroDeserializationInfo) {
+            return buildAvroDeserializationSchema(fieldInfos);
+        } else if (deserializationInfo instanceof CanalDeserializationInfo) {
+            return CanalDeserializationSchemaBuilder.build(fieldInfos, (CanalDeserializationInfo) deserializationInfo);
+        } else {
+            return buildStringDeserializationSchema(fieldInfos);
+        }
+    }
+
+    private static DeserializationSchema<Row> buildJsonDeserializationSchema(
+            FieldInfo[] fieldInfos) throws IOException, ClassNotFoundException {
+        FieldInfo[] convertedFieldInfos = convertDateToStringFormatInfo(fieldInfos);
+        RowTypeInfo rowTypeInfo = convertFieldInfosToRowTypeInfo(convertedFieldInfos);
+        JsonRowDeserializationSchema jsonSchema = new JsonRowDeserializationSchema.Builder(rowTypeInfo).build();
+        return new CustomDateFormatDeserializationSchemaWrapper(jsonSchema, extractFormatInfos(fieldInfos));
+    }
+
+    private static DeserializationSchema<Row> buildAvroDeserializationSchema(FieldInfo[] fieldInfos) {
+        String avroSchemaInJson = buildAvroRecordSchemaInJson(fieldInfos);
+        return new AvroRowDeserializationSchema(avroSchemaInJson);
+    }
+
+    private static DeserializationSchema<Row> buildStringDeserializationSchema(FieldInfo[] fieldInfos) {
+        return new DeserializationSchema<Row>() {
+
+            private static final long serialVersionUID = 9206114128358065420L;
+
+            private final RowTypeInfo rowTypeInfo = convertFieldInfosToRowTypeInfo(fieldInfos);
+
+            @Override
+            public Row deserialize(byte[] message) {
+                Row row = new Row(1);
+                row.setField(0, new String(message, StandardCharsets.UTF_8));
+                return row;
+            }
+
+            @Override
+            public boolean isEndOfStream(Row nextElement) {
+                return false;
+            }
+
+            @Override
+            public TypeInformation<Row> getProducedType() {
+                return rowTypeInfo;
+            }
+        };
+    }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/ListCollector.java
similarity index 56%
copy from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
copy to inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/ListCollector.java
index fafae28..4df161f 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/ListCollector.java
@@ -15,28 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.protocol.deserialization;
+package org.apache.inlong.sort.singletenant.flink.deserialization;
 
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.util.Collector;
 
-/**
- * .
- */
-public class CsvDeserializationInfo implements DeserializationInfo {
+import java.util.ArrayList;
+import java.util.List;
+
+public class ListCollector<T> implements Collector<T> {
 
-    private final char splitter;
+    private List<T> innerList = new ArrayList<>();
 
-    // TODO: support mapping index to field
+    @Override
+    public void collect(T record) {
+        innerList.add(record);
+    }
 
-    @JsonCreator
-    public CsvDeserializationInfo(
-            @JsonProperty("splitter") char splitter) {
-        this.splitter = splitter;
+    @Override
+    public void close() {
+        innerList = null;
     }
 
-    @JsonProperty("splitter")
-    public char getSplitter() {
-        return splitter;
+    public List<T> getInnerList() {
+        return innerList;
     }
+
 }
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/RowDataToRowDeserializationSchemaWrapper.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/RowDataToRowDeserializationSchemaWrapper.java
new file mode 100644
index 0000000..18749e4
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/RowDataToRowDeserializationSchemaWrapper.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.deserialization;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.inlong.sort.protocol.FieldInfo;
+
+import java.io.IOException;
+
+import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToRowTypeInfo;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.createRowConverter;
+
+public class RowDataToRowDeserializationSchemaWrapper implements DeserializationSchema<Row> {
+
+    private static final long serialVersionUID = 6918677263991851770L;
+
+    private final DeserializationSchema<RowData> innerSchema;
+
+    private final DataFormatConverters.RowConverter rowConverter;
+
+    private final TypeInformation<Row> producedTypeInfo;
+
+    public RowDataToRowDeserializationSchemaWrapper(
+            DeserializationSchema<RowData> innerSchema, FieldInfo[] fieldInfos) {
+        this.innerSchema = checkNotNull(innerSchema);
+        rowConverter = createRowConverter(checkNotNull(fieldInfos));
+        producedTypeInfo = convertFieldInfosToRowTypeInfo(fieldInfos);
+    }
+
+    @Override
+    public Row deserialize(byte[] message) {
+        throw new RuntimeException(
+                "Please invoke DeserializationSchema#deserialize(byte[], Collector<RowData>) instead.");
+    }
+
+    @Override
+    public void deserialize(byte[] message, Collector<Row> out) throws IOException {
+        ListCollector<RowData> collector = new ListCollector<>();
+        innerSchema.deserialize(message, collector);
+        collector.getInnerList().forEach(record -> out.collect(rowConverter.toExternal(record)));
+    }
+
+    @Override
+    public boolean isEndOfStream(Row nextElement) {
+        return innerSchema.isEndOfStream(rowConverter.toInternal(nextElement));
+    }
+
+    @Override
+    public TypeInformation<Row> getProducedType() {
+        return producedTypeInfo;
+    }
+
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        innerSchema.open(context);
+    }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkBuilder.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkBuilder.java
index 798a69f..298e052 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkBuilder.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkBuilder.java
@@ -18,34 +18,25 @@
 package org.apache.inlong.sort.singletenant.flink.kafka;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.util.DataFormatConverters;
 import org.apache.flink.types.Row;
 import org.apache.inlong.sort.configuration.Configuration;
-import org.apache.inlong.sort.configuration.Constants;
-import org.apache.inlong.sort.protocol.serialization.CanalSerializationInfo;
-import org.apache.inlong.sort.protocol.serialization.SerializationInfo;
 import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo;
-import org.apache.inlong.sort.singletenant.flink.serialization.RowDataSerializationSchemaFactory;
-import org.apache.inlong.sort.singletenant.flink.serialization.RowSerializationSchemaFactory;
 import org.apache.kafka.clients.producer.ProducerConfig;
 
 import java.util.Map;
 import java.util.Properties;
 
 import static org.apache.inlong.sort.configuration.Constants.SINK_KAFKA_PRODUCER_POOL_SIZE;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.createRowConverter;
 
 public class KafkaSinkBuilder {
 
-    public static <T> SinkFunction<T> buildKafkaSink(
+    public static SinkFunction<Row> buildKafkaSink(
             KafkaSinkInfo kafkaSinkInfo,
             Map<String, Object> properties,
-            SerializationSchema<T> serializationSchema,
+            SerializationSchema<Row> schema,
             Configuration config
     ) {
         String topic = kafkaSinkInfo.getTopic();
@@ -53,7 +44,7 @@ public class KafkaSinkBuilder {
 
         return new FlinkKafkaProducer<>(
                 topic,
-                serializationSchema,
+                schema,
                 producerProperties,
                 new FlinkFixedPartitioner<>(),
                 FlinkKafkaProducer.Semantic.EXACTLY_ONCE,
@@ -68,38 +59,4 @@ public class KafkaSinkBuilder {
         return producerProperties;
     }
 
-    public static void buildKafkaSinkStream(
-            DataStream<Row> sourceStream,
-            KafkaSinkInfo kafkaSinkInfo,
-            Map<String, Object> properties,
-            Configuration config,
-            int sinkParallelism
-    ) {
-        SerializationInfo serializationInfo = kafkaSinkInfo.getSerializationInfo();
-        if (serializationInfo instanceof CanalSerializationInfo) {
-            DataFormatConverters.RowConverter rowConverter = createRowConverter(kafkaSinkInfo);
-            DataStream<RowData> dataStream = sourceStream
-                    .map(rowConverter::toInternal)
-                    .uid(Constants.CONVERTER_UID)
-                    .name("Row to RowData Converter")
-                    .setParallelism(sinkParallelism);
-
-            SerializationSchema<RowData> schema = RowDataSerializationSchemaFactory.build(
-                    kafkaSinkInfo.getFields(), kafkaSinkInfo.getSerializationInfo());
-
-            dataStream
-                    .addSink(buildKafkaSink(kafkaSinkInfo, properties, schema, config))
-                    .uid(Constants.SINK_UID)
-                    .name("Kafka Sink")
-                    .setParallelism(sinkParallelism);
-        } else {
-            SerializationSchema<Row> schema = RowSerializationSchemaFactory.build(
-                    kafkaSinkInfo.getFields(), kafkaSinkInfo.getSerializationInfo());
-            sourceStream
-                    .addSink(buildKafkaSink(kafkaSinkInfo, properties, schema, config))
-                    .uid(Constants.SINK_UID)
-                    .name("Kafka Sink")
-                    .setParallelism(sinkParallelism);
-        }
-    }
 }
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/pulsar/PulsarSourceBuilder.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/pulsar/PulsarSourceBuilder.java
new file mode 100644
index 0000000..3359c78
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/pulsar/PulsarSourceBuilder.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.pulsar;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.flink.pulsar.PulsarDeserializationSchema;
+import org.apache.inlong.sort.flink.pulsar.PulsarSourceFunction;
+import org.apache.inlong.sort.protocol.source.PulsarSourceInfo;
+import org.apache.inlong.sort.singletenant.flink.SerializedRecord;
+import org.apache.pulsar.client.api.Message;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class PulsarSourceBuilder {
+
+    public static PulsarSourceFunction<SerializedRecord> buildPulsarSource(
+            PulsarSourceInfo sourceInfo,
+            Configuration config,
+            Map<String, Object> properties
+    ) {
+
+        Map<String, String> configMap = config.toMap();
+        if (properties != null && !properties.isEmpty()) {
+            for (Map.Entry<String, Object> entry : properties.entrySet()) {
+                configMap.put(entry.getKey(), entry.getValue().toString());
+            }
+        }
+
+        org.apache.flink.configuration.Configuration flinkConfig =
+                org.apache.flink.configuration.Configuration.fromMap(configMap);
+
+        return new PulsarSourceFunction<>(
+                sourceInfo.getAdminUrl(),
+                sourceInfo.getServiceUrl(),
+                sourceInfo.getTopic(),
+                sourceInfo.getSubscriptionName(),
+                sourceInfo.getAuthentication(),
+                new PulsarDeserializationSchemaImpl(),
+                flinkConfig
+        );
+    }
+
+    public static class PulsarDeserializationSchemaImpl implements PulsarDeserializationSchema<SerializedRecord> {
+
+        private static final long serialVersionUID = -3642110610339179932L;
+
+        @Override
+        public DeserializationResult<SerializedRecord> deserialize(
+                @SuppressWarnings("rawtypes") Message message) throws IOException {
+            final byte[] data = message.getData();
+            return DeserializationResult.of(new SerializedRecord(message.getEventTime(), data), data.length);
+        }
+
+        @Override
+        public TypeInformation<SerializedRecord> getProducedType() {
+            return TypeInformation.of(SerializedRecord.class);
+        }
+
+    }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowDataSerializationSchemaFactory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/CanalSerializationSchemaBuilder.java
similarity index 66%
rename from inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowDataSerializationSchemaFactory.java
rename to inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/CanalSerializationSchemaBuilder.java
index 69d7378..c7ace10 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowDataSerializationSchemaFactory.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/CanalSerializationSchemaBuilder.java
@@ -19,62 +19,51 @@ package org.apache.inlong.sort.singletenant.flink.serialization;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonOptions;
 import org.apache.flink.formats.json.canal.CanalJsonSerializationSchema;
-import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.serialization.CanalSerializationInfo;
-import org.apache.inlong.sort.protocol.serialization.SerializationInfo;
 
+import java.io.IOException;
+
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertDateToStringFormatInfo;
 import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToRowType;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.extractFormatInfos;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.getTimestampFormatStandard;
 
-public class RowDataSerializationSchemaFactory {
+public class CanalSerializationSchemaBuilder {
 
-    private static final String CANAL_TIMESTAMP_STANDARD_SQL = "SQL";
-    private static final String CANAL_TIMESTAMP_STANDARD_ISO = "ISO_8601";
     private static final String CANAL_MAP_NULL_KEY_MODE_FAIL = "FAIL";
     private static final String CANAL_MAP_NULL_KEY_MODE_DROP = "DROP";
     private static final String CANAL_MAP_NULL_KEY_MODE_LITERAL = "LITERAL";
 
     private static final String CANAL_MAP_NULL_KEY_LITERAL_DEFAULT = "null";
 
-    public static SerializationSchema<RowData> build(FieldInfo[] fieldInfos, SerializationInfo serializationInfo) {
-        if (serializationInfo instanceof CanalSerializationInfo) {
-            return buildCanalRowDataSerializationSchema(fieldInfos, (CanalSerializationInfo) serializationInfo);
-        }
-
-        throw new IllegalArgumentException("Unsupported RowData serialization info: " + serializationInfo);
-    }
-
-    private static SerializationSchema<RowData> buildCanalRowDataSerializationSchema(
+    public static SerializationSchema<Row> build(
             FieldInfo[] fieldInfos,
             CanalSerializationInfo canalSerializationInfo
-    ) {
+    ) throws IOException, ClassNotFoundException {
         String mapNullKeyLiteral = canalSerializationInfo.getMapNullKeyLiteral();
         if (StringUtils.isEmpty(mapNullKeyLiteral)) {
             mapNullKeyLiteral = CANAL_MAP_NULL_KEY_LITERAL_DEFAULT;
         }
 
-        RowType rowType = convertFieldInfosToRowType(fieldInfos);
-        return new CanalJsonSerializationSchema(
-                rowType,
+        FieldInfo[] convertedFieldInfos = convertDateToStringFormatInfo(fieldInfos);
+        RowType convertedRowType = convertFieldInfosToRowType(convertedFieldInfos);
+        CanalJsonSerializationSchema canalSchema = new CanalJsonSerializationSchema(
+                convertedRowType,
                 getTimestampFormatStandard(canalSerializationInfo.getTimestampFormatStandard()),
                 getMapNullKeyMode(canalSerializationInfo.getMapNullKeyMod()),
                 mapNullKeyLiteral,
                 canalSerializationInfo.isEncodeDecimalAsPlainNumber()
         );
-    }
 
-    private static TimestampFormat getTimestampFormatStandard(String input) {
-        if (CANAL_TIMESTAMP_STANDARD_SQL.equals(input)) {
-            return TimestampFormat.SQL;
-        } else if (CANAL_TIMESTAMP_STANDARD_ISO.equals(input)) {
-            return TimestampFormat.ISO_8601;
-        }
+        RowToRowDataSerializationSchemaWrapper rowToRowDataSchema
+                = new RowToRowDataSerializationSchemaWrapper(canalSchema, convertedFieldInfos);
 
-        throw new IllegalArgumentException("Unsupported timestamp format standard: " + input);
+        return new CustomDateFormatSerializationSchemaWrapper(rowToRowDataSchema, extractFormatInfos(fieldInfos));
     }
 
     private static JsonOptions.MapNullKeyMode getMapNullKeyMode(String input) {
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/CustomDateFormatSerializationSchemaWrapper.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/CustomDateFormatSerializationSchemaWrapper.java
new file mode 100644
index 0000000..f271e39
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/CustomDateFormatSerializationSchemaWrapper.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.serialization;
+
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.shaded.guava18.com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.types.Row;
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.TimeFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkState;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.isStandardTimestampFormat;
+
+public class CustomDateFormatSerializationSchemaWrapper implements SerializationSchema<Row> {
+
+    private static final long serialVersionUID = 7342088340154604198L;
+
+    private final SerializationSchema<Row> innerSchema;
+
+    private final FormatInfo[] formatInfos;
+
+    public CustomDateFormatSerializationSchemaWrapper(SerializationSchema<Row> innerSchema, FormatInfo[] formatInfos) {
+        this.innerSchema = checkNotNull(innerSchema);
+        this.formatInfos = checkNotNull(formatInfos);
+    }
+
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        innerSchema.open(context);
+    }
+
+    @Override
+    public byte[] serialize(Row element) {
+        Row outputRow = fromDateAndTimeToString(element);
+        return innerSchema.serialize(outputRow);
+    }
+
+    @VisibleForTesting
+    Row fromDateAndTimeToString(Row inputRow) {
+        int arity = inputRow.getArity();
+        Row outputRow = new Row(arity);
+        for (int i = 0; i < arity; i++) {
+            outputRow.setField(i, convert(inputRow.getField(i), formatInfos[i]));
+        }
+        outputRow.setKind(inputRow.getKind());
+        return outputRow;
+    }
+
+    private Object convert(Object input, FormatInfo formatInfo) {
+
+        if (formatInfo instanceof DateFormatInfo && !isStandardTimestampFormat(formatInfo)) {
+            checkState(input instanceof Date);
+            return FastDateFormat.getInstance(((DateFormatInfo) formatInfo).getFormat()).format(input);
+
+        } else if (formatInfo instanceof TimeFormatInfo && !isStandardTimestampFormat(formatInfo)) {
+            checkState(input instanceof Time);
+            return FastDateFormat.getInstance(((TimeFormatInfo) formatInfo).getFormat()).format(input);
+
+        } else if (formatInfo instanceof TimestampFormatInfo && !isStandardTimestampFormat(formatInfo)) {
+            checkState(input instanceof Timestamp);
+            return FastDateFormat.getInstance(((TimestampFormatInfo) formatInfo).getFormat()).format(input);
+
+        } else {
+            return input;
+        }
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowToRowDataSerializationSchemaWrapper.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowToRowDataSerializationSchemaWrapper.java
new file mode 100644
index 0000000..83ea0e4
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowToRowDataSerializationSchemaWrapper.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.serialization;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.types.Row;
+import org.apache.inlong.sort.protocol.FieldInfo;
+
+import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.createRowConverter;
+
+public class RowToRowDataSerializationSchemaWrapper implements SerializationSchema<Row> {
+
+    private static final long serialVersionUID = 2496100417131340005L;
+
+    private final SerializationSchema<RowData> innerSchema;
+
+    private final DataFormatConverters.RowConverter rowConverter;
+
+    public RowToRowDataSerializationSchemaWrapper(SerializationSchema<RowData> innerSchema, FieldInfo[] fieldInfos) {
+        this.innerSchema = checkNotNull(innerSchema);
+        this.rowConverter = createRowConverter(checkNotNull(fieldInfos));
+    }
+
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        innerSchema.open(context);
+    }
+
+    @Override
+    public byte[] serialize(Row element) {
+        return innerSchema.serialize(rowConverter.toInternal(element));
+    }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowSerializationSchemaFactory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/SerializationSchemaFactory.java
similarity index 55%
rename from inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowSerializationSchemaFactory.java
rename to inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/SerializationSchemaFactory.java
index b2eb961..afb3bb9 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowSerializationSchemaFactory.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/SerializationSchemaFactory.java
@@ -22,41 +22,55 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.formats.avro.AvroRowSerializationSchema;
 import org.apache.flink.formats.json.JsonRowSerializationSchema;
 import org.apache.flink.types.Row;
+import org.apache.inlong.sort.formats.common.FormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.serialization.AvroSerializationInfo;
+import org.apache.inlong.sort.protocol.serialization.CanalSerializationInfo;
 import org.apache.inlong.sort.protocol.serialization.JsonSerializationInfo;
 import org.apache.inlong.sort.protocol.serialization.SerializationInfo;
 
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 
 import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.buildAvroRecordSchemaInJson;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertDateToStringFormatInfo;
 import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToRowTypeInfo;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.extractFormatInfos;
 
-public class RowSerializationSchemaFactory {
+public class SerializationSchemaFactory {
 
-    public static SerializationSchema<Row> build(FieldInfo[] fieldInfos, SerializationInfo serializationInfo) {
+    public static SerializationSchema<Row> build(
+            FieldInfo[] fieldInfos,
+            SerializationInfo serializationInfo
+    ) throws IOException, ClassNotFoundException {
         if (serializationInfo instanceof JsonSerializationInfo) {
-            return buildJsonRowSerializationSchema(fieldInfos);
+            return buildJsonSerializationSchema(fieldInfos);
         } else if (serializationInfo instanceof AvroSerializationInfo) {
-            return buildAvroRowSerializationSchema(fieldInfos);
+            return buildAvroSerializationSchema(fieldInfos);
+        } else if (serializationInfo instanceof CanalSerializationInfo) {
+            return CanalSerializationSchemaBuilder.build(fieldInfos, (CanalSerializationInfo) serializationInfo);
         } else {
-            return buildStringRowSerializationSchema();
+            return buildStringSerializationSchema(extractFormatInfos(fieldInfos));
         }
     }
 
-    private static SerializationSchema<Row> buildJsonRowSerializationSchema(FieldInfo[] fieldInfos) {
-        RowTypeInfo rowTypeInfo = convertFieldInfosToRowTypeInfo(fieldInfos);
+    private static SerializationSchema<Row> buildJsonSerializationSchema(
+            FieldInfo[] fieldInfos
+    ) throws IOException, ClassNotFoundException {
+        FieldInfo[] convertedFieldInfos = convertDateToStringFormatInfo(fieldInfos);
+        RowTypeInfo convertedRowTypeInfo = convertFieldInfosToRowTypeInfo(convertedFieldInfos);
         JsonRowSerializationSchema.Builder builder = JsonRowSerializationSchema.builder();
-        return builder.withTypeInfo(rowTypeInfo).build();
+        JsonRowSerializationSchema innerSchema = builder.withTypeInfo(convertedRowTypeInfo).build();
+        return new CustomDateFormatSerializationSchemaWrapper(innerSchema, extractFormatInfos(fieldInfos));
     }
 
-    private static SerializationSchema<Row> buildAvroRowSerializationSchema(FieldInfo[] fieldInfos) {
+    private static SerializationSchema<Row> buildAvroSerializationSchema(FieldInfo[] fieldInfos) {
         String avroSchemaInJson = buildAvroRecordSchemaInJson(fieldInfos);
         return new AvroRowSerializationSchema(avroSchemaInJson);
     }
 
-    private static SerializationSchema<Row> buildStringRowSerializationSchema() {
-        return new SerializationSchema<Row>() {
+    private static SerializationSchema<Row> buildStringSerializationSchema(FormatInfo[] formatInfos) {
+        SerializationSchema<Row> stringSchema = new SerializationSchema<Row>() {
 
             private static final long serialVersionUID = -6818985955456373916L;
 
@@ -65,5 +79,8 @@ public class RowSerializationSchemaFactory {
                 return element.toString().getBytes(StandardCharsets.UTF_8);
             }
         };
+
+        return new CustomDateFormatSerializationSchemaWrapper(stringSchema, formatInfos);
     }
+
 }
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java
index 0ee5b7a..cd6f610 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.singletenant.flink.utils;
 
 import org.apache.avro.Schema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.TableSchema.Builder;
 import org.apache.flink.table.data.util.DataFormatConverters;
@@ -27,20 +28,32 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.inlong.sort.formats.base.TableFormatUtils;
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
 import org.apache.inlong.sort.formats.common.FormatInfo;
 import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimeFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
 import org.apache.inlong.sort.formats.common.TypeInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.sink.SinkInfo;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
 
 import static org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema;
+import static org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter.toDataType;
 import static org.apache.inlong.sort.formats.base.TableFormatUtils.deriveLogicalType;
+import static org.apache.inlong.sort.formats.common.Constants.DATE_AND_TIME_STANDARD_ISO_8601;
+import static org.apache.inlong.sort.formats.common.Constants.DATE_AND_TIME_STANDARD_SQL;
 
 public class CommonUtils {
 
-    public static TableSchema getTableSchema(SinkInfo sinkInfo) {
+    public static TableSchema getTableSchema(FieldInfo[] fieldInfos) {
         TableSchema.Builder builder = new Builder();
-        FieldInfo[] fieldInfos = sinkInfo.getFields();
 
         for (FieldInfo fieldInfo : fieldInfos) {
             builder.field(
@@ -68,7 +81,7 @@ public class CommonUtils {
         return new org.apache.flink.api.java.typeutils.RowTypeInfo(typeInformationArray, fieldNames);
     }
 
-    public static String buildAvroRecordSchemaInJson(FieldInfo[] fieldInfos) {
+    public static LogicalType convertFieldInfosToLogicalType(FieldInfo[] fieldInfos) {
         int fieldLength = fieldInfos.length;
         String[] fieldNames = new String[fieldLength];
         FormatInfo[] fieldFormatInfos = new FormatInfo[fieldLength];
@@ -78,7 +91,11 @@ public class CommonUtils {
         }
 
         RowFormatInfo rowFormatInfo = new RowFormatInfo(fieldNames, fieldFormatInfos);
-        LogicalType logicalType = deriveLogicalType(rowFormatInfo);
+        return deriveLogicalType(rowFormatInfo);
+    }
+
+    public static String buildAvroRecordSchemaInJson(FieldInfo[] fieldInfos) {
+        LogicalType logicalType = convertFieldInfosToLogicalType(fieldInfos);
         Schema schema = convertToSchema(logicalType);
 
         if (schema.isUnion()) {
@@ -87,8 +104,13 @@ public class CommonUtils {
         return schema.toString();
     }
 
-    public static DataFormatConverters.RowConverter createRowConverter(SinkInfo sinkInfo) {
-        DataType[] fieldDataTypes = getTableSchema(sinkInfo).getFieldDataTypes();
+    public static DataType convertFieldInfosToDataType(FieldInfo[] fieldInfos) {
+        LogicalType logicalType = convertFieldInfosToLogicalType(fieldInfos);
+        return toDataType(logicalType);
+    }
+
+    public static DataFormatConverters.RowConverter createRowConverter(FieldInfo[] fieldInfos) {
+        DataType[] fieldDataTypes = getTableSchema(fieldInfos).getFieldDataTypes();
         return new DataFormatConverters.RowConverter(fieldDataTypes);
     }
 
@@ -104,4 +126,71 @@ public class CommonUtils {
         return RowType.of(fieldLogicalTypes, fieldNames);
     }
 
+    public static TimestampFormat getTimestampFormatStandard(String input) {
+        if (DATE_AND_TIME_STANDARD_SQL.equals(input)) {
+            return TimestampFormat.SQL;
+        } else if (DATE_AND_TIME_STANDARD_ISO_8601.equals(input)) {
+            return TimestampFormat.ISO_8601;
+        }
+
+        throw new IllegalArgumentException("Unsupported timestamp format standard: " + input);
+    }
+
+    public static Object deepCopy(Serializable input) throws IOException, ClassNotFoundException {
+        byte[] bytes;
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+            oos.writeObject(input);
+            bytes = baos.toByteArray();
+        }
+
+        try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+             ObjectInputStream ois = new ObjectInputStream(bais)) {
+            return ois.readObject();
+        }
+    }
+
+    // TODO: support map and array
+    public static FieldInfo[] convertDateToStringFormatInfo(FieldInfo[] inputInfos)
+            throws IOException, ClassNotFoundException {
+        FieldInfo[] copiedInfos = (FieldInfo[]) deepCopy(inputInfos);
+        for (FieldInfo copiedInfo : copiedInfos) {
+            FormatInfo formatInfo = copiedInfo.getFormatInfo();
+            if (formatInfo instanceof DateFormatInfo
+                    || formatInfo instanceof TimeFormatInfo
+                    || formatInfo instanceof TimestampFormatInfo) {
+                if (!isStandardTimestampFormat(formatInfo)) {
+                    copiedInfo.setFormatInfo(StringFormatInfo.INSTANCE);
+                }
+            }
+        }
+
+        return copiedInfos;
+    }
+
+    public static boolean isStandardTimestampFormat(FormatInfo formatInfo) {
+        if (formatInfo instanceof DateFormatInfo) {
+            String format = ((DateFormatInfo) formatInfo).getFormat();
+            return DATE_AND_TIME_STANDARD_SQL.equals(format) || DATE_AND_TIME_STANDARD_ISO_8601.equals(format);
+        } else if (formatInfo instanceof TimeFormatInfo) {
+            String format = ((TimeFormatInfo) formatInfo).getFormat();
+            return DATE_AND_TIME_STANDARD_SQL.equals(format) || DATE_AND_TIME_STANDARD_ISO_8601.equals(format);
+        } else if (formatInfo instanceof TimestampFormatInfo) {
+            String format = ((TimestampFormatInfo) formatInfo).getFormat();
+            return DATE_AND_TIME_STANDARD_SQL.equals(format) || DATE_AND_TIME_STANDARD_ISO_8601.equals(format);
+        }
+
+        return false;
+    }
+
+    public static FormatInfo[] extractFormatInfos(FieldInfo[] fieldInfos) {
+        int length = fieldInfos.length;
+        FormatInfo[] output = new FormatInfo[length];
+        for (int i = 0; i < length; i++) {
+            output[i] = fieldInfos[i].getFormatInfo();
+        }
+
+        return output;
+    }
+
 }
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/AvroDeserializationTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/AvroDeserializationTest.java
new file mode 100644
index 0000000..e4c3946
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/AvroDeserializationTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.deserialization;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.types.Row;
+import org.apache.inlong.sort.formats.common.BinaryFormatInfo;
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.MapFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimeFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.deserialization.AvroDeserializationInfo;
+import org.apache.inlong.sort.protocol.serialization.AvroSerializationInfo;
+import org.apache.inlong.sort.singletenant.flink.serialization.SerializationSchemaFactory;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class AvroDeserializationTest {
+
+    private final FieldInfo[] fieldInfos = new FieldInfo[]{
+            new FieldInfo("id", LongFormatInfo.INSTANCE),
+            new FieldInfo("name", StringFormatInfo.INSTANCE),
+            new FieldInfo("bytes", BinaryFormatInfo.INSTANCE),
+            new FieldInfo("date", new DateFormatInfo("yyyy-MM-dd")),
+            new FieldInfo("time", new TimeFormatInfo("HH:mm:ss")),
+            new FieldInfo("timestamp", new TimestampFormatInfo("yyyy-MM-dd HH:mm:ss")),
+            new FieldInfo("map",
+                    new MapFormatInfo(StringFormatInfo.INSTANCE, LongFormatInfo.INSTANCE)),
+            new FieldInfo("map2map", new MapFormatInfo(StringFormatInfo.INSTANCE,
+                    new MapFormatInfo(StringFormatInfo.INSTANCE, IntFormatInfo.INSTANCE)))
+    };
+
+    private Row generateTestRow() {
+        Row testRow = new Row(8);
+        testRow.setField(0, 1238123899121L);
+        testRow.setField(1, "testName");
+
+        byte[] bytes = new byte[]{1, 2, 3, 4, 5, 6};
+        testRow.setField(2, bytes);
+
+        testRow.setField(3, Date.valueOf("1990-10-14"));
+        testRow.setField(4, Time.valueOf("12:12:43"));
+        testRow.setField(5, Timestamp.valueOf("1990-10-14 12:12:43"));
+
+        Map<String, Long> map = new HashMap<>();
+        map.put("flink", 123L);
+        testRow.setField(6, map);
+
+        Map<String, Map<String, Integer>> nestedMap = new HashMap<>();
+        Map<String, Integer> innerMap = new HashMap<>();
+        innerMap.put("key", 234);
+        nestedMap.put("inner_map", innerMap);
+        testRow.setField(7, nestedMap);
+
+        return testRow;
+    }
+
+    @Test
+    public void testAvroDeserializationSchema() throws IOException, ClassNotFoundException {
+        SerializationSchema<Row> serializationSchema =
+                SerializationSchemaFactory.build(fieldInfos, new AvroSerializationInfo());
+
+        Row expectedRow = generateTestRow();
+        byte[] bytes = serializationSchema.serialize(expectedRow);
+
+        DeserializationSchema<Row> deserializationSchema =
+                DeserializationSchemaFactory.build(fieldInfos, new AvroDeserializationInfo());
+        Row actualRow = deserializationSchema.deserialize(bytes);
+
+        assertEquals(expectedRow, actualRow);
+    }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/CanalDeserializationTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/CanalDeserializationTest.java
new file mode 100644
index 0000000..b649ef0
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/CanalDeserializationTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.deserialization;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.Row;
+import org.apache.inlong.sort.formats.common.BinaryFormatInfo;
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
+import org.apache.inlong.sort.formats.common.FloatFormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.MapFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimeFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.deserialization.CanalDeserializationInfo;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CanalDeserializationTest {
+
+    private final FieldInfo[] fieldInfos = new FieldInfo[]{
+            new FieldInfo("id", LongFormatInfo.INSTANCE),
+            new FieldInfo("name", StringFormatInfo.INSTANCE),
+            new FieldInfo("bytes", BinaryFormatInfo.INSTANCE),
+            new FieldInfo("date", new DateFormatInfo("yyyy-MM-dd")),
+            new FieldInfo("time", new TimeFormatInfo("HH:mm:ss")),
+            new FieldInfo("timestamp", new TimestampFormatInfo("yyyy-MM-dd HH:mm:ss")),
+            new FieldInfo("map",
+                    new MapFormatInfo(StringFormatInfo.INSTANCE, LongFormatInfo.INSTANCE)),
+            new FieldInfo("map2map", new MapFormatInfo(StringFormatInfo.INSTANCE,
+                    new MapFormatInfo(StringFormatInfo.INSTANCE, IntFormatInfo.INSTANCE)))
+    };
+
+    private Row generateTestRow() {
+        Row testRow = new Row(8);
+        testRow.setField(0, 1238123899121L);
+        testRow.setField(1, "testName");
+
+        byte[] bytes = new byte[]{1, 2, 3, 4, 5, 6};
+        testRow.setField(2, bytes);
+
+        testRow.setField(3, Date.valueOf("1990-10-14"));
+        testRow.setField(4, Time.valueOf("12:12:43"));
+        testRow.setField(5, Timestamp.valueOf("1990-10-14 12:12:43"));
+
+        Map<String, Long> map = new HashMap<>();
+        map.put("flink", 123L);
+        testRow.setField(6, map);
+
+        Map<String, Map<String, Integer>> nestedMap = new HashMap<>();
+        Map<String, Integer> innerMap = new HashMap<>();
+        innerMap.put("key", 234);
+        nestedMap.put("inner_map", innerMap);
+        testRow.setField(7, nestedMap);
+
+        return testRow;
+    }
+
+    @Test
+    public void testDeserializeStringWithoutMetadata() throws IOException, ClassNotFoundException {
+        String testString = "{\n"
+                + "    \"data\":[\n"
+                + "        {\n"
+                + "            \"id\":1238123899121,\n"
+                + "            \"name\":\"testName\",\n"
+                + "            \"bytes\":\"AQIDBAUG\",\n"
+                + "            \"date\":\"1990-10-14\",\n"
+                + "            \"time\":\"12:12:43\",\n"
+                + "            \"timestamp\":\"1990-10-14 12:12:43\",\n"
+                + "            \"map\":{\n"
+                + "                \"flink\":123\n"
+                + "            },\n"
+                + "            \"map2map\":{\n"
+                + "                \"inner_map\":{\n"
+                + "                    \"key\":234\n"
+                + "                }\n"
+                + "            }\n"
+                + "        }\n"
+                + "    ],\n"
+                + "    \"type\":\"INSERT\"\n"
+                + "}";
+        byte[] testBytes = testString.getBytes(StandardCharsets.UTF_8);
+        DeserializationSchema<Row> schema = DeserializationSchemaFactory.build(
+                fieldInfos,
+                new CanalDeserializationInfo(null, null, false, "ISO_8601", false)
+        );
+        ListCollector<Row> collector = new ListCollector<>();
+        schema.deserialize(testBytes, collector);
+        assertEquals(generateTestRow(), collector.getInnerList().get(0));
+    }
+
+    @Test
+    public void testCanalDeserializationSchema() throws IOException, ClassNotFoundException {
+        String testCanalData = "{\n"
+                + "    \"data\":[\n"
+                + "        {\n"
+                + "            \"id\":\"101\",\n"
+                + "            \"name\":\"scooter\",\n"
+                + "            \"description\":\"Small 2-wheel scooter\",\n"
+                + "            \"weight\":\"3.14\"\n"
+                + "        },\n"
+                + "        {\n"
+                + "            \"id\":\"102\",\n"
+                + "            \"name\":\"car battery\",\n"
+                + "            \"description\":\"12V car battery\",\n"
+                + "            \"weight\":\"8.1\"\n"
+                + "        },\n"
+                + "        {\n"
+                + "            \"id\":\"103\",\n"
+                + "            \"name\":\"12-pack drill bits\",\n"
+                + "            \"description\":\"12-pack of drill bits with sizes ranging from #40 to #3\",\n"
+                + "            \"weight\":\"0.8\"\n"
+                + "        }\n"
+                + "    ],\n"
+                + "    \"database\":\"inventory\",\n"
+                + "    \"es\":1589373515000,\n"
+                + "    \"id\":3,\n"
+                + "    \"isDdl\":false,\n"
+                + "    \"mysqlType\":{\n"
+                + "        \"id\":\"INTEGER\",\n"
+                + "        \"name\":\"VARCHAR(255)\",\n"
+                + "        \"description\":\"VARCHAR(512)\",\n"
+                + "        \"weight\":\"FLOAT\"\n"
+                + "    },\n"
+                + "    \"old\":null,\n"
+                + "    \"pkNames\":[\n"
+                + "        \"id\"\n"
+                + "    ],\n"
+                + "    \"sql\":\"\",\n"
+                + "    \"sqlType\":{\n"
+                + "        \"id\":4,\n"
+                + "        \"name\":12,\n"
+                + "        \"description\":12,\n"
+                + "        \"weight\":7\n"
+                + "    },\n"
+                + "    \"table\":\"products2\",\n"
+                + "    \"ts\":1589373515477,\n"
+                + "    \"type\":\"INSERT\"\n"
+                + "}";
+
+        byte[] testBytes = testCanalData.getBytes(StandardCharsets.UTF_8);
+        FieldInfo[] fieldInfos = new FieldInfo[]{
+                new FieldInfo("id", IntFormatInfo.INSTANCE),
+                new FieldInfo("name", StringFormatInfo.INSTANCE),
+                new FieldInfo("description", StringFormatInfo.INSTANCE),
+                new FieldInfo("weight", FloatFormatInfo.INSTANCE)
+        };
+
+        DeserializationSchema<Row> schemaWithoutFilter = DeserializationSchemaFactory.build(
+                fieldInfos,
+                new CanalDeserializationInfo(null, null, false, "ISO_8601", true)
+        );
+        ListCollector<Row> collector1 = new ListCollector<>();
+        schemaWithoutFilter.deserialize(testBytes, collector1);
+        List<Row> innerList = collector1.getInnerList();
+        assertEquals(3, innerList.size());
+        Row row = innerList.get(0);
+        assertEquals(101, row.getField(0));
+        assertEquals("scooter", row.getField(1).toString());
+        assertEquals("Small 2-wheel scooter", row.getField(2).toString());
+        assertEquals(3.14f, (Float) row.getField(3), 0);
+        assertEquals("inventory", row.getField(4).toString());
+        assertEquals("products2", row.getField(5).toString());
+        assertEquals(4, ((Map<?, ?>) row.getField(6)).size());
+        assertEquals("id", ((String[]) row.getField(7))[0]);
+        // "es" and "ts" are treated as local in flink-canal
+        assertEquals(1589373515477L, TimestampData.fromLocalDateTime((LocalDateTime) row.getField(8)).getMillisecond());
+        assertEquals(1589373515000L, TimestampData.fromLocalDateTime((LocalDateTime) row.getField(9)).getMillisecond());
+
+        DeserializationSchema<Row> schemaWithFilter = DeserializationSchemaFactory.build(
+                fieldInfos,
+                new CanalDeserializationInfo("NoExistDB", null, false, "ISO_8601", true)
+        );
+        ListCollector<Row> collector2 = new ListCollector<>();
+        schemaWithFilter.deserialize(testBytes, collector2);
+        assertTrue(collector2.getInnerList().isEmpty());
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/CustomDateFormatDeserializationSchemaWrapperTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/CustomDateFormatDeserializationSchemaWrapperTest.java
new file mode 100644
index 0000000..0a5dbd4
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/CustomDateFormatDeserializationSchemaWrapperTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.deserialization;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.types.Row;
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimeFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.text.ParseException;
+
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.extractFormatInfos;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CustomDateFormatDeserializationSchemaWrapperTest {
+
+    @Test
+    public void testFromStringToDateAndTime() throws IOException, ClassNotFoundException, ParseException {
+        FieldInfo[] fieldInfos = new FieldInfo[]{
+                new FieldInfo("f1", new DateFormatInfo()),
+                new FieldInfo("f2", new TimeFormatInfo()),
+                new FieldInfo("f3", new TimestampFormatInfo()),
+                new FieldInfo("f4", StringFormatInfo.INSTANCE)
+        };
+
+        DeserializationSchema<Row> stringSchema = DeserializationSchemaFactory.build(fieldInfos, null);
+        CustomDateFormatDeserializationSchemaWrapper schemaWrapper
+                = new CustomDateFormatDeserializationSchemaWrapper(stringSchema, extractFormatInfos(fieldInfos));
+
+        Row testRow = Row.of("2022-02-15", "15:52:30", "2022-02-15 15:52:30", "don't convert");
+        Row resultRow = schemaWrapper.fromStringToDateAndTime(testRow);
+        assertTrue(resultRow.getField(0) instanceof Date);
+        assertTrue(resultRow.getField(1) instanceof Time);
+        assertTrue(resultRow.getField(2) instanceof Timestamp);
+
+        final Row expectedRow = Row.of(Date.valueOf("2022-02-15"), Time.valueOf("15:52:30"),
+                Timestamp.valueOf("2022-02-15 15:52:30"), "don't convert");
+        assertEquals(expectedRow, resultRow);
+    }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationFunctionTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationFunctionTest.java
new file mode 100644
index 0000000..0f88049
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationFunctionTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.deserialization;
+
+import org.apache.flink.types.Row;
+import org.apache.inlong.commons.msg.InLongMsg;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.singletenant.flink.SerializedRecord;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.assertEquals;
+
+public class DeserializationFunctionTest {
+
+    @Test
+    public void testProcessElement() throws Exception {
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        String testData = "testData";
+        inLongMsg.addMsg("m=12&iname=tid", testData.getBytes(StandardCharsets.UTF_8));
+        SerializedRecord serializedRecord = new SerializedRecord(1, inLongMsg.buildArray());
+
+        DeserializationFunction function = new DeserializationFunction(
+                DeserializationSchemaFactory.build(
+                        new FieldInfo[]{new FieldInfo("content", StringFormatInfo.INSTANCE)}, null
+                )
+        );
+
+        ListCollector<Row> collector = new ListCollector<>();
+        function.processElement(serializedRecord,null, collector);
+        Row row = collector.getInnerList().get(0);
+        assertEquals(1, row.getArity());
+        assertEquals(testData, row.getField(0));
+    }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/JsonDeserializationTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/JsonDeserializationTest.java
new file mode 100644
index 0000000..7e57422
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/JsonDeserializationTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.deserialization;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.types.Row;
+import org.apache.inlong.sort.formats.common.BinaryFormatInfo;
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.MapFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimeFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.deserialization.JsonDeserializationInfo;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class JsonDeserializationTest {
+
+    private final FieldInfo[] fieldInfos = new FieldInfo[]{
+            new FieldInfo("id", LongFormatInfo.INSTANCE),
+            new FieldInfo("name", StringFormatInfo.INSTANCE),
+            new FieldInfo("bytes", BinaryFormatInfo.INSTANCE),
+            new FieldInfo("date", new DateFormatInfo("yyyy-MM-dd")),
+            new FieldInfo("time", new TimeFormatInfo("HH:mm:ss")),
+            new FieldInfo("timestamp", new TimestampFormatInfo("yyyy-MM-dd HH:mm:ss")),
+            new FieldInfo("map",
+                    new MapFormatInfo(StringFormatInfo.INSTANCE, LongFormatInfo.INSTANCE)),
+            new FieldInfo("map2map", new MapFormatInfo(StringFormatInfo.INSTANCE,
+                    new MapFormatInfo(StringFormatInfo.INSTANCE, IntFormatInfo.INSTANCE)))
+    };
+
+    private Row generateTestRow() {
+        Row testRow = new Row(8);
+        testRow.setField(0, 1238123899121L);
+        testRow.setField(1, "testName");
+
+        byte[] bytes = new byte[]{1, 2, 3, 4, 5, 6};
+        testRow.setField(2, bytes);
+
+        testRow.setField(3, Date.valueOf("1990-10-14"));
+        testRow.setField(4, Time.valueOf("12:12:43"));
+        testRow.setField(5, Timestamp.valueOf("1990-10-14 12:12:43"));
+
+        Map<String, Long> map = new HashMap<>();
+        map.put("flink", 123L);
+        testRow.setField(6, map);
+
+        Map<String, Map<String, Integer>> nestedMap = new HashMap<>();
+        Map<String, Integer> innerMap = new HashMap<>();
+        innerMap.put("key", 234);
+        nestedMap.put("inner_map", innerMap);
+        testRow.setField(7, nestedMap);
+
+        return testRow;
+    }
+
+    @Test
+    public void testJsonDeserializationSchema() throws IOException, ClassNotFoundException {
+        Row expectedRow = generateTestRow();
+        ObjectMapper objectMapper = new ObjectMapper();
+        ObjectNode root = objectMapper.createObjectNode();
+        //noinspection ConstantConditions
+        root.put("id", (long) expectedRow.getField(0));
+        root.put("name", (String) expectedRow.getField(1));
+        root.put("bytes", (byte[]) expectedRow.getField(2));
+        root.put("date", "1990-10-14");
+        root.put("time", "12:12:43");
+        root.put("timestamp", "1990-10-14 12:12:43");
+        root.putObject("map").put("flink", 123);
+        root.putObject("map2map").putObject("inner_map").put("key", 234);
+
+        byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+
+        DeserializationSchema<Row> schema =
+                DeserializationSchemaFactory.build(fieldInfos, new JsonDeserializationInfo());
+
+        ListCollector<Row> collector = new ListCollector<>();
+        schema.deserialize(serializedJson, collector);
+
+        assertEquals(expectedRow, collector.getInnerList().get(0));
+    }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java
index 9e21991..1b98346 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java
@@ -26,7 +26,9 @@ import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.types.Row;
+import org.apache.inlong.sort.configuration.Configuration;
 import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -52,6 +54,7 @@ import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
@@ -61,10 +64,11 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
+import static org.apache.inlong.sort.singletenant.flink.kafka.KafkaSinkBuilder.buildKafkaSink;
 import static org.apache.inlong.sort.singletenant.flink.utils.NetUtils.getUnusedLocalPort;
 import static org.junit.Assert.assertNull;
 
-public abstract class KafkaSinkTestBase<T> {
+public abstract class KafkaSinkTestBase {
 
     private static final Logger logger = LoggerFactory.getLogger(KafkaSinkTestBase.class);
 
@@ -79,13 +83,13 @@ public abstract class KafkaSinkTestBase<T> {
     private AdminClient kafkaAdmin;
     private KafkaConsumer<String, Bytes> kafkaConsumer;
     private Properties kafkaClientProperties;
-    protected String brokerConnStr;
+    private String brokerConnStr;
 
     // prepare data below in subclass
     protected String topic;
     protected List<Row> testRows;
     protected FieldInfo[] fieldInfos;
-    protected SerializationSchema<T> serializationSchema;
+    protected SerializationSchema<Row> serializationSchema;
 
     @Before
     public void setup() throws Exception {
@@ -160,7 +164,7 @@ public abstract class KafkaSinkTestBase<T> {
                 "The topic metadata failed to propagate to Kafka broker.");
     }
 
-    protected abstract void prepareData();
+    protected abstract void prepareData() throws IOException, ClassNotFoundException;
 
     @After
     public void clean() throws IOException {
@@ -198,7 +202,14 @@ public abstract class KafkaSinkTestBase<T> {
             StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
             try {
-                buildJob(env, testingSource);
+                env.addSource(testingSource).addSink(
+                        buildKafkaSink(
+                                new KafkaSinkInfo(new FieldInfo[]{}, brokerConnStr, topic, null),
+                                new HashMap<>(),
+                                serializationSchema,
+                                new Configuration()
+                        )
+                );
                 env.execute();
                 testFinishedCountDownLatch.await();
             } catch (Exception e) {
@@ -211,8 +222,6 @@ public abstract class KafkaSinkTestBase<T> {
         testFinishedCountDownLatch.countDown();
     }
 
-    protected abstract void buildJob(StreamExecutionEnvironment env, TestingSource testingSource);
-
     private void verify() throws Exception {
         kafkaConsumer.subscribe(Collections.singleton(topic));
         List<Bytes> results = new ArrayList<>();
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBaseForRow.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBaseForRow.java
deleted file mode 100644
index 9a612c7..0000000
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBaseForRow.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.kafka;
-
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.types.Row;
-import org.apache.inlong.sort.configuration.Configuration;
-import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo;
-
-import java.util.HashMap;
-
-import static org.apache.inlong.sort.singletenant.flink.kafka.KafkaSinkBuilder.buildKafkaSink;
-
-public abstract class KafkaSinkTestBaseForRow extends KafkaSinkTestBase<Row> {
-
-    @Override
-    protected void buildJob(StreamExecutionEnvironment env, TestingSource testingSource) {
-        env.addSource(testingSource).addSink(
-                buildKafkaSink(
-                        new KafkaSinkInfo(new FieldInfo[]{}, brokerConnStr, topic, null),
-                        new HashMap<>(),
-                        serializationSchema,
-                        new Configuration()
-                )
-        );
-    }
-
-}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBaseForRowData.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBaseForRowData.java
deleted file mode 100644
index bb17731..0000000
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBaseForRowData.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.kafka;
-
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.util.DataFormatConverters;
-import org.apache.inlong.sort.configuration.Configuration;
-import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo;
-
-import java.util.HashMap;
-
-import static org.apache.inlong.sort.singletenant.flink.kafka.KafkaSinkBuilder.buildKafkaSink;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.createRowConverter;
-
-public abstract class KafkaSinkTestBaseForRowData extends KafkaSinkTestBase<RowData> {
-
-    @Override
-    protected void buildJob(StreamExecutionEnvironment env, TestingSource testingSource) {
-        KafkaSinkInfo kafkaSinkInfo = new KafkaSinkInfo(fieldInfos, brokerConnStr, topic, null);
-        DataFormatConverters.RowConverter rowConverter = createRowConverter(kafkaSinkInfo);
-        env.addSource(testingSource).map(rowConverter::toInternal).returns(RowData.class).addSink(
-                buildKafkaSink(kafkaSinkInfo, new HashMap<>(), serializationSchema, new Configuration())
-        );
-    }
-
-}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToAvroKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToAvroKafkaSinkTest.java
index 82c7ee7..2a08417 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToAvroKafkaSinkTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToAvroKafkaSinkTest.java
@@ -34,7 +34,7 @@ import org.apache.inlong.sort.formats.common.RowFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.serialization.AvroSerializationInfo;
-import org.apache.inlong.sort.singletenant.flink.serialization.RowSerializationSchemaFactory;
+import org.apache.inlong.sort.singletenant.flink.serialization.SerializationSchemaFactory;
 import org.apache.kafka.common.utils.Bytes;
 
 import java.io.IOException;
@@ -47,10 +47,10 @@ import java.util.Map;
 import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.buildAvroRecordSchemaInJson;
 import static org.junit.Assert.assertEquals;
 
-public class RowToAvroKafkaSinkTest extends KafkaSinkTestBaseForRow {
+public class RowToAvroKafkaSinkTest extends KafkaSinkTestBase {
 
     @Override
-    protected void prepareData() {
+    protected void prepareData() throws IOException, ClassNotFoundException {
         fieldInfos = new FieldInfo[]{
                 new FieldInfo("f1", new StringFormatInfo()),
                 new FieldInfo("f2", new IntFormatInfo()),
@@ -68,7 +68,7 @@ public class RowToAvroKafkaSinkTest extends KafkaSinkTestBaseForRow {
                 ))
         };
         topic = "test_kafka_row_to_avro";
-        serializationSchema = RowSerializationSchemaFactory.build(fieldInfos, new AvroSerializationInfo());
+        serializationSchema = SerializationSchemaFactory.build(fieldInfos, new AvroSerializationInfo());
 
         prepareTestRows();
     }
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToCanalKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToCanalKafkaSinkTest.java
index 8a56347..db7fb04 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToCanalKafkaSinkTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToCanalKafkaSinkTest.java
@@ -24,25 +24,26 @@ import org.apache.inlong.sort.formats.common.IntFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.serialization.CanalSerializationInfo;
-import org.apache.inlong.sort.singletenant.flink.serialization.RowDataSerializationSchemaFactory;
+import org.apache.inlong.sort.singletenant.flink.serialization.SerializationSchemaFactory;
 import org.apache.kafka.common.utils.Bytes;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 
-public class RowToCanalKafkaSinkTest extends KafkaSinkTestBaseForRowData {
+public class RowToCanalKafkaSinkTest extends KafkaSinkTestBase {
 
     @Override
-    protected void prepareData() {
+    protected void prepareData() throws IOException, ClassNotFoundException {
         topic = "test_kafka_row_to_canal";
         fieldInfos = new FieldInfo[]{
                 new FieldInfo("f1", new StringFormatInfo()),
                 new FieldInfo("f2", new IntFormatInfo())
         };
 
-        serializationSchema = RowDataSerializationSchemaFactory.build(
+        serializationSchema = SerializationSchemaFactory.build(
                 fieldInfos, new CanalSerializationInfo("sql", "literal", "null", true)
         );
 
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java
index 7239dc7..66c964b 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java
@@ -26,9 +26,10 @@ import org.apache.inlong.sort.formats.common.MapFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.serialization.JsonSerializationInfo;
-import org.apache.inlong.sort.singletenant.flink.serialization.RowSerializationSchemaFactory;
+import org.apache.inlong.sort.singletenant.flink.serialization.SerializationSchemaFactory;
 import org.apache.kafka.common.utils.Bytes;
 
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -37,11 +38,11 @@ import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 
-public class RowToJsonKafkaSinkTest extends KafkaSinkTestBaseForRow {
+public class RowToJsonKafkaSinkTest extends KafkaSinkTestBase {
     @Override
-    protected void prepareData() {
+    protected void prepareData() throws IOException, ClassNotFoundException {
         topic = "test_kafka_row_to_json";
-        serializationSchema = RowSerializationSchemaFactory.build(
+        serializationSchema = SerializationSchemaFactory.build(
                 new FieldInfo[]{
                         new FieldInfo("f1", new StringFormatInfo()),
                         new FieldInfo("f2", new MapFormatInfo(new StringFormatInfo(), new DoubleFormatInfo())),
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java
index dbab9b6..999f8b5 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java
@@ -22,21 +22,22 @@ import org.apache.flink.types.Row;
 import org.apache.inlong.sort.formats.common.DoubleFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.singletenant.flink.serialization.RowSerializationSchemaFactory;
+import org.apache.inlong.sort.singletenant.flink.serialization.SerializationSchemaFactory;
 import org.apache.kafka.common.utils.Bytes;
 
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 
-public class RowToStringKafkaSinkTest extends KafkaSinkTestBaseForRow {
+public class RowToStringKafkaSinkTest extends KafkaSinkTestBase {
 
     @Override
-    protected void prepareData() {
+    protected void prepareData() throws IOException, ClassNotFoundException {
         topic = "test_kafka_row_to_string";
-        serializationSchema = RowSerializationSchemaFactory.build(
+        serializationSchema = SerializationSchemaFactory.build(
                 new FieldInfo[]{
                         new FieldInfo("f1", new StringFormatInfo()),
                         new FieldInfo("f2", new DoubleFormatInfo())
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/serialization/CustomDateFormatSerializationSchemaWrapperTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/serialization/CustomDateFormatSerializationSchemaWrapperTest.java
new file mode 100644
index 0000000..15dca5e
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/serialization/CustomDateFormatSerializationSchemaWrapperTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.serialization;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.types.Row;
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.TimeFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.extractFormatInfos;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CustomDateFormatSerializationSchemaWrapperTest {
+
+    @Test
+    public void testFromDateAndTimeToString() throws IOException, ClassNotFoundException {
+        FieldInfo[] fieldInfos = new FieldInfo[]{
+                new FieldInfo("f1", new DateFormatInfo()),
+                new FieldInfo("f2", new TimeFormatInfo()),
+                new FieldInfo("f3", new TimestampFormatInfo()),
+                new FieldInfo("f4", IntFormatInfo.INSTANCE)
+        };
+
+        SerializationSchema<Row> stringSchema = SerializationSchemaFactory.build(fieldInfos, null);
+
+        CustomDateFormatSerializationSchemaWrapper schemaWrapper
+                = new CustomDateFormatSerializationSchemaWrapper(stringSchema, extractFormatInfos(fieldInfos));
+
+        Row testRow = Row.of(Date.valueOf("2022-02-15"), Time.valueOf("15:52:30"),
+                Timestamp.valueOf("2022-02-15 15:52:30"), 1);
+
+        Row resultRow = schemaWrapper.fromDateAndTimeToString(testRow);
+        assertTrue(resultRow.getField(0) instanceof String);
+        assertTrue(resultRow.getField(1) instanceof String);
+        assertTrue(resultRow.getField(2) instanceof String);
+
+        Row expectedRow = Row.of("2022-02-15", "15:52:30", "2022-02-15 15:52:30", 1);
+        assertEquals(expectedRow, resultRow);
+    }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtilsTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtilsTest.java
index c2894e9..00d3b79 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtilsTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtilsTest.java
@@ -38,8 +38,10 @@ import java.io.IOException;
 
 import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.buildAvroRecordSchemaInJson;
 import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToRowTypeInfo;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.deepCopy;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class CommonUtilsTest {
 
@@ -296,4 +298,19 @@ public class CommonUtilsTest {
 
         assertEquals(expectedJsonNode, actualJsonNode);
     }
+
+    @Test
+    public void testDeepCopy() throws IOException, ClassNotFoundException {
+        FieldInfo[] fieldInfos = new FieldInfo[]{
+                new FieldInfo("f1", StringFormatInfo.INSTANCE),
+                new FieldInfo("f2", IntFormatInfo.INSTANCE)
+        };
+
+        FieldInfo[] copiedInfos = (FieldInfo[]) deepCopy(fieldInfos);
+        assertArrayEquals(fieldInfos, copiedInfos);
+
+        fieldInfos[0].setFormatInfo(IntFormatInfo.INSTANCE);
+        assertTrue(copiedInfos[0].getFormatInfo() instanceof StringFormatInfo);
+    }
+
 }