You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/02/16 11:10:33 UTC

[incubator-inlong] branch master updated: [INLONG-2346][Feature][InLong-Sort] Support avro and canal formats for sort sink (#2387)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 317ee86  [INLONG-2346][Feature][InLong-Sort] Support avro and canal formats for sort sink (#2387)
317ee86 is described below

commit 317ee8630afe3d1d86c96d0b7e063af44a12fdd5
Author: Kevin Wen <ke...@gmail.com>
AuthorDate: Wed Feb 16 19:10:28 2022 +0800

    [INLONG-2346][Feature][InLong-Sort] Support avro and canal formats for sort sink (#2387)
---
 inlong-sort/pom.xml                                |  54 ++---
 .../inlong/sort/configuration/Constants.java       |   2 +
 .../serialization/AvroSerializationInfo.java       |   8 +
 .../serialization/CanalSerializationInfo.java      |  75 ++++++
 .../inlong/sort/protocol/sink/KafkaSinkInfo.java   |   6 +-
 .../serialization/AvroSerializationInfoTest.java   |  36 +++
 .../serialization/CanalSerializationInfoTest.java  |  38 ++++
 inlong-sort/sort-dist/pom.xml                      |  39 ++++
 .../inlong/sort/formats/base/TableFormatUtils.java |  22 +-
 .../sort/formats/common/BinaryFormatInfo.java      |  63 +++++
 .../inlong/sort/formats/common/BinaryTypeInfo.java |  46 ++++
 .../inlong/sort/formats/common/FormatInfo.java     |   4 +-
 .../inlong/sort/formats/common/NullFormatInfo.java |  51 +++++
 .../inlong/sort/formats/common/NullTypeInfo.java   |  46 ++++
 .../sort/formats/common/TimestampFormatInfo.java   |   4 +-
 .../inlong/sort/formats/common/TypeInfo.java       |   4 +-
 .../sort/formats/common/BinaryFormatInfoTest.java  |  53 +++++
 .../formats/common/TimestampFormatInfoTest.java    |   1 +
 .../sort-formats/format-inlongmsg-base/pom.xml     |   1 +
 inlong-sort/sort-single-tenant/pom.xml             |  68 ++++++
 .../inlong/sort/singletenant/flink/Entrance.java   |   8 +-
 .../singletenant/flink/kafka/KafkaSinkBuilder.java |  50 +++-
 .../RowDataSerializationSchemaFactory.java         |  92 ++++++++
 .../RowSerializationSchemaFactory.java             |  21 +-
 .../sort/singletenant/flink/utils/CommonUtils.java |  47 ++++
 .../flink/kafka/KafkaSinkTestBase.java             |  42 ++--
 .../flink/kafka/KafkaSinkTestBaseForRow.java       |  45 ++++
 .../flink/kafka/KafkaSinkTestBaseForRowData.java   |  43 ++++
 .../flink/kafka/RowToAvroKafkaSinkTest.java        | 127 +++++++++++
 .../flink/kafka/RowToCanalKafkaSinkTest.java       |  87 +++++++
 .../flink/kafka/RowToJsonKafkaSinkTest.java        |  36 +--
 .../flink/kafka/RowToStringKafkaSinkTest.java      |  34 +--
 .../singletenant/flink/utils/CommonUtilsTest.java  | 253 +++++++++++++++++++++
 33 files changed, 1386 insertions(+), 120 deletions(-)

diff --git a/inlong-sort/pom.xml b/inlong-sort/pom.xml
index 14cbd4f..d242dd2 100644
--- a/inlong-sort/pom.xml
+++ b/inlong-sort/pom.xml
@@ -168,6 +168,12 @@
 
             <dependency>
                 <groupId>org.apache.flink</groupId>
+                <artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId>
+                <version>${flink.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.flink</groupId>
                 <artifactId>flink-parquet_${flink.scala.binary.version}</artifactId>
                 <version>${flink.version}</version>
             </dependency>
@@ -185,6 +191,12 @@
             </dependency>
 
             <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-avro</artifactId>
+                <version>${flink.version}</version>
+            </dependency>
+
+            <dependency>
                 <groupId>com.google.guava</groupId>
                 <artifactId>guava</artifactId>
                 <version>${guava.version}</version>
@@ -372,6 +384,12 @@
                 <artifactId>kafka_${flink.scala.binary.version}</artifactId>
                 <version>${kafka.version}</version>
                 <scope>test</scope>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.apache.zookeeper</groupId>
+                        <artifactId>zookeeper</artifactId>
+                    </exclusion>
+                </exclusions>
             </dependency>
 
             <dependency>
@@ -420,45 +438,13 @@
                     </argLine>
                 </configuration>
             </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-shade-plugin</artifactId>
-                <version>3.2.4</version>
-                <executions>
-                    <execution>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>shade</goal>
-                        </goals>
-                        <configuration>
-                            <createDependencyReducedPom>false</createDependencyReducedPom>
-                            <shadedArtifactAttached>false</shadedArtifactAttached>
-                            <finalName>${project.artifactId}-${project.version}</finalName>
-                            <filters>
-                                <!-- Globally exclude log4j.properties from our JAR files. -->
-                                <filter>
-                                    <artifact>*</artifact>
-                                    <excludes>
-                                        <exclude>log4j.properties</exclude>
-                                        <exclude>log4j-test.properties</exclude>
-                                        <exclude>META-INF/*.SF</exclude>
-                                        <exclude>META-INF/*.DSA</exclude>
-                                        <exclude>META-INF/*.RSA</exclude>
-                                    </excludes>
-                                </filter>
-                            </filters>
-                            <transformers>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
-                            </transformers>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
+
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-deploy-plugin</artifactId>
                 <version>2.8.2</version>
             </plugin>
+
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-source-plugin</artifactId>
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
index c6e8067..a0ad42b 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
@@ -57,6 +57,8 @@ public class Constants {
 
     public static final String DESERIALIZATION_SCHEMA_UID = "deserialization_schema_uid";
 
+    public static final String CONVERTER_UID = "converter_uid";
+
     public static final String SINK_UID = "sink_uid";
 
     /**
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/serialization/AvroSerializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/serialization/AvroSerializationInfo.java
index c9a82cf..56c6d72 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/serialization/AvroSerializationInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/serialization/AvroSerializationInfo.java
@@ -21,4 +21,12 @@ public class AvroSerializationInfo implements SerializationInfo {
 
     private static final long serialVersionUID = 8446721117598285868L;
 
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        return o != null && getClass() == o.getClass();
+    }
 }
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/serialization/CanalSerializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/serialization/CanalSerializationInfo.java
index 633b10b..5849114 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/serialization/CanalSerializationInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/serialization/CanalSerializationInfo.java
@@ -17,8 +17,83 @@
 
 package org.apache.inlong.sort.protocol.serialization;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
 public class CanalSerializationInfo implements SerializationInfo {
 
     private static final long serialVersionUID = 479443152335788151L;
 
+    @JsonProperty("timestamp_format_standard")
+    private final String timestampFormatStandard;
+
+    @JsonProperty("map_null_key_mod")
+    private final String mapNullKeyMod;
+
+    @JsonProperty("map_null_key_literal")
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final String mapNullKeyLiteral;
+
+    @JsonProperty("encode_decimal_as_plain_number")
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final boolean encodeDecimalAsPlainNumber;
+
+    @JsonCreator
+    public CanalSerializationInfo(
+            @JsonProperty("timestamp_format_standard") String timestampFormatStandard,
+            @JsonProperty("map_null_key_mod") String mapNullKeyMod,
+            @JsonProperty("map_null_key_literal") String mapNullKeyLiteral,
+            @JsonProperty("encode_decimal_as_plain_number") boolean encodeDecimalAsPlainNumber
+    ) {
+        this.timestampFormatStandard = checkNotNull(timestampFormatStandard).toUpperCase();
+        this.mapNullKeyMod = checkNotNull(mapNullKeyMod).toUpperCase();
+        this.mapNullKeyLiteral = mapNullKeyLiteral;
+        this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
+    }
+
+    @JsonProperty("timestamp_format_standard")
+    public String getTimestampFormatStandard() {
+        return timestampFormatStandard;
+    }
+
+    @JsonProperty("map_null_key_mod")
+    public String getMapNullKeyMod() {
+        return mapNullKeyMod;
+    }
+
+    @JsonProperty("map_null_key_literal")
+    public String getMapNullKeyLiteral() {
+        return mapNullKeyLiteral;
+    }
+
+    @JsonProperty("encode_decimal_as_plain_number")
+    public boolean isEncodeDecimalAsPlainNumber() {
+        return encodeDecimalAsPlainNumber;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        CanalSerializationInfo that = (CanalSerializationInfo) o;
+        return encodeDecimalAsPlainNumber == that.encodeDecimalAsPlainNumber
+                && Objects.equals(timestampFormatStandard, that.timestampFormatStandard)
+                && Objects.equals(mapNullKeyMod, that.mapNullKeyMod)
+                && Objects.equals(mapNullKeyLiteral, that.mapNullKeyLiteral);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(timestampFormatStandard, mapNullKeyMod, mapNullKeyLiteral, encodeDecimalAsPlainNumber);
+    }
+
 }
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/KafkaSinkInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/KafkaSinkInfo.java
index b4922f5..90f55c1 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/KafkaSinkInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/KafkaSinkInfo.java
@@ -24,6 +24,8 @@ import org.apache.inlong.sort.protocol.serialization.SerializationInfo;
 
 import java.util.Objects;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 public class KafkaSinkInfo extends SinkInfo {
 
     private static final long serialVersionUID = 161617117094475954L;
@@ -45,8 +47,8 @@ public class KafkaSinkInfo extends SinkInfo {
             @JsonProperty("serialization_info") SerializationInfo serializationInfo
     ) {
         super(fields);
-        this.address = address;
-        this.topic = topic;
+        this.address = checkNotNull(address);
+        this.topic = checkNotNull(topic);
         this.serializationInfo = serializationInfo;
     }
 
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/serialization/AvroSerializationInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/serialization/AvroSerializationInfoTest.java
new file mode 100644
index 0000000..0a1e175
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/serialization/AvroSerializationInfoTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.serialization;
+
+import org.apache.inlong.sort.protocol.ProtocolBaseTest;
+
+public class AvroSerializationInfoTest extends ProtocolBaseTest {
+
+    @Override
+    public void init() {
+        expectedObject = new AvroSerializationInfo();
+
+        expectedJson = "{\"type\":\"avro\"}";
+
+        equalObj1 = expectedObject;
+        equalObj2 = new AvroSerializationInfo();
+        unequalObj = null;
+    }
+
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/serialization/CanalSerializationInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/serialization/CanalSerializationInfoTest.java
new file mode 100644
index 0000000..b0ad5fb
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/serialization/CanalSerializationInfoTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.serialization;
+
+import org.apache.inlong.sort.protocol.ProtocolBaseTest;
+
+public class CanalSerializationInfoTest extends ProtocolBaseTest {
+
+    @Override
+    public void init() {
+        expectedObject = new CanalSerializationInfo("Sql", "Literal", null, false);
+        expectedJson = "{\n"
+                + "  \"type\" : \"canal\",\n"
+                + "  \"timestamp_format_standard\" : \"Sql\",\n"
+                + "  \"map_null_key_mod\" : \"Literal\"\n"
+                + "}";
+        equalObj1 = expectedObject;
+        equalObj2 = new CanalSerializationInfo("Sql", "Literal", null, false);
+        unequalObj = new CanalSerializationInfo("Sql", "Literal", null, true);
+    }
+
+}
diff --git a/inlong-sort/sort-dist/pom.xml b/inlong-sort/sort-dist/pom.xml
index b5538da..a4646f1 100644
--- a/inlong-sort/sort-dist/pom.xml
+++ b/inlong-sort/sort-dist/pom.xml
@@ -48,4 +48,43 @@
             <version>${project.version}</version>
         </dependency>
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>3.2.4</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <createDependencyReducedPom>false</createDependencyReducedPom>
+                            <shadedArtifactAttached>false</shadedArtifactAttached>
+                            <finalName>${project.artifactId}-${project.version}</finalName>
+                            <filters>
+                                <!-- Globally exclude log4j.properties from our JAR files. -->
+                                <filter>
+                                    <artifact>*</artifact>
+                                    <excludes>
+                                        <exclude>log4j.properties</exclude>
+                                        <exclude>log4j-test.properties</exclude>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <transformers>
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
\ No newline at end of file
diff --git a/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java b/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
index d3abb01..40de211 100644
--- a/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
+++ b/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
@@ -39,6 +39,7 @@ import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
 import org.apache.flink.table.types.logical.BooleanType;
 import org.apache.flink.table.types.logical.DateType;
 import org.apache.flink.table.types.logical.DecimalType;
@@ -47,6 +48,7 @@ import org.apache.flink.table.types.logical.FloatType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.NullType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.SmallIntType;
 import org.apache.flink.table.types.logical.TimeType;
@@ -57,6 +59,8 @@ import org.apache.flink.types.Row;
 import org.apache.inlong.sort.formats.common.ArrayFormatInfo;
 import org.apache.inlong.sort.formats.common.ArrayTypeInfo;
 import org.apache.inlong.sort.formats.common.BasicFormatInfo;
+import org.apache.inlong.sort.formats.common.BinaryFormatInfo;
+import org.apache.inlong.sort.formats.common.BinaryTypeInfo;
 import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
 import org.apache.inlong.sort.formats.common.BooleanTypeInfo;
 import org.apache.inlong.sort.formats.common.ByteFormatInfo;
@@ -77,6 +81,7 @@ import org.apache.inlong.sort.formats.common.LongFormatInfo;
 import org.apache.inlong.sort.formats.common.LongTypeInfo;
 import org.apache.inlong.sort.formats.common.MapFormatInfo;
 import org.apache.inlong.sort.formats.common.MapTypeInfo;
+import org.apache.inlong.sort.formats.common.NullFormatInfo;
 import org.apache.inlong.sort.formats.common.RowFormatInfo;
 import org.apache.inlong.sort.formats.common.RowTypeInfo;
 import org.apache.inlong.sort.formats.common.ShortFormatInfo;
@@ -94,6 +99,9 @@ import org.apache.inlong.sort.formats.common.TypeInfo;
  */
 public class TableFormatUtils {
 
+    // to support avro format, precision must be less than 3
+    private static final int DEFAULT_PRECISION_FOR_TIMESTAMP = 2;
+
     /**
      * Returns the {@link DeserializationSchema} described by the given
      * properties.
@@ -299,6 +307,10 @@ public class TableFormatUtils {
             }
 
             return new RowFormatInfo(fieldNames, fieldFormatInfos);
+        } else if (logicalType instanceof BinaryType) {
+            return BinaryFormatInfo.INSTANCE;
+        } else if (logicalType instanceof NullType) {
+            return NullFormatInfo.INSTANCE;
         } else {
             throw new UnsupportedOperationException();
         }
@@ -331,7 +343,7 @@ public class TableFormatUtils {
         } else if (formatInfo instanceof DateFormatInfo) {
             return new DateType();
         } else if (formatInfo instanceof TimestampFormatInfo) {
-            return new TimestampType();
+            return new TimestampType(DEFAULT_PRECISION_FOR_TIMESTAMP);
         } else if (formatInfo instanceof ArrayFormatInfo) {
             FormatInfo elementFormatInfo = ((ArrayFormatInfo) formatInfo).getElementFormatInfo();
             return new ArrayType(deriveLogicalType(elementFormatInfo));
@@ -350,6 +362,10 @@ public class TableFormatUtils {
                 logicalTypes[i] = deriveLogicalType(formatInfos[i]);
             }
             return RowType.of(logicalTypes, rowFormatInfo.getFieldNames());
+        } else if (formatInfo instanceof BinaryFormatInfo) {
+            return new BinaryType();
+        } else if (formatInfo instanceof NullFormatInfo) {
+            return new NullType();
         } else {
             throw new UnsupportedOperationException();
         }
@@ -386,6 +402,8 @@ public class TableFormatUtils {
             return Types.SQL_TIME;
         } else if (typeInfo instanceof TimestampTypeInfo) {
             return Types.SQL_TIMESTAMP;
+        } else if (typeInfo instanceof BinaryTypeInfo) {
+            return Types.PRIMITIVE_ARRAY(Types.BYTE);
         } else if (typeInfo instanceof ArrayTypeInfo) {
             ArrayTypeInfo arrayTypeInfo = (ArrayTypeInfo) typeInfo;
             TypeInfo elementTypeInfo =
@@ -414,7 +432,7 @@ public class TableFormatUtils {
 
             return Types.ROW_NAMED(fieldNames, fieldTypes);
         } else {
-            throw new IllegalStateException("Unexpected format.");
+            throw new IllegalStateException("Unexpected type info " + typeInfo + ".");
         }
     }
 
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/BinaryFormatInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/BinaryFormatInfo.java
new file mode 100644
index 0000000..b8e5774
--- /dev/null
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/BinaryFormatInfo.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.common;
+
+import java.nio.charset.StandardCharsets;
+
+public class BinaryFormatInfo implements BasicFormatInfo<byte[]> {
+
+    private static final long serialVersionUID = 8379022656220694709L;
+
+    public static final BinaryFormatInfo INSTANCE = new BinaryFormatInfo();
+
+    @Override
+    public String serialize(byte[] record) throws Exception {
+        return new String(record, StandardCharsets.UTF_8);
+    }
+
+    @Override
+    public byte[] deserialize(String text) throws Exception {
+        return text.getBytes(StandardCharsets.UTF_8);
+    }
+
+    @Override
+    public TypeInfo getTypeInfo() {
+        return BinaryTypeInfo.INSTANCE;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        return o != null && getClass() == o.getClass();
+    }
+
+    @Override
+    public int hashCode() {
+        return getClass().hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "BinaryFormatInfo";
+    }
+
+}
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/BinaryTypeInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/BinaryTypeInfo.java
new file mode 100644
index 0000000..a1ab577
--- /dev/null
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/BinaryTypeInfo.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.common;
+
+public class BinaryTypeInfo implements TypeInfo {
+
+    private static final long serialVersionUID = -5950082561854221079L;
+
+    public static final BinaryTypeInfo INSTANCE = new BinaryTypeInfo();
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        return o != null && getClass() == o.getClass();
+    }
+
+    @Override
+    public int hashCode() {
+        return getClass().hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "BinaryTypeInfo";
+    }
+
+}
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/FormatInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/FormatInfo.java
index a443e54..79b8411 100644
--- a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/FormatInfo.java
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/FormatInfo.java
@@ -42,7 +42,9 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
         @JsonSubTypes.Type(name = "timestamp", value = TimestampFormatInfo.class),
         @JsonSubTypes.Type(name = "array", value = ArrayFormatInfo.class),
         @JsonSubTypes.Type(name = "map", value = MapFormatInfo.class),
-        @JsonSubTypes.Type(name = "row", value = RowFormatInfo.class)
+        @JsonSubTypes.Type(name = "row", value = RowFormatInfo.class),
+        @JsonSubTypes.Type(name = "binary", value = BinaryFormatInfo.class),
+        @JsonSubTypes.Type(name = "null", value = NullFormatInfo.class)
 })
 public interface FormatInfo extends Serializable {
 
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/NullFormatInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/NullFormatInfo.java
new file mode 100644
index 0000000..baf148a
--- /dev/null
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/NullFormatInfo.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.common;
+
+public class NullFormatInfo implements FormatInfo {
+
+    private static final long serialVersionUID = 4672953842346988594L;
+
+    public static final NullFormatInfo INSTANCE = new NullFormatInfo();
+
+    @Override
+    public TypeInfo getTypeInfo() {
+        return NullTypeInfo.INSTANCE;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        return o != null && getClass() == o.getClass();
+    }
+
+    @Override
+    public int hashCode() {
+        return getClass().hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "NullFormatInfo";
+    }
+
+}
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/NullTypeInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/NullTypeInfo.java
new file mode 100644
index 0000000..5c5e59c
--- /dev/null
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/NullTypeInfo.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.common;
+
+public class NullTypeInfo implements TypeInfo {
+
+    private static final long serialVersionUID = -8808899337434251627L;
+
+    public static final NullTypeInfo INSTANCE = new NullTypeInfo();
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        return o != null && getClass() == o.getClass();
+    }
+
+    @Override
+    public int hashCode() {
+        return getClass().hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "NullTypeInfo";
+    }
+
+}
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimestampFormatInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimestampFormatInfo.java
index 69b0882..ac7c307 100644
--- a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimestampFormatInfo.java
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimestampFormatInfo.java
@@ -54,8 +54,8 @@ public class TimestampFormatInfo implements BasicFormatInfo<Timestamp> {
         this.format = format;
 
         if (!format.equals("MICROS")
-                    && !format.equals("MILLIS")
-                    && !format.equals("SECONDS")) {
+                && !format.equals("MILLIS")
+                && !format.equals("SECONDS")) {
             this.simpleDateFormat = new SimpleDateFormat(format);
         } else {
             this.simpleDateFormat = null;
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TypeInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TypeInfo.java
index fdadbcf..626f620 100644
--- a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TypeInfo.java
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TypeInfo.java
@@ -41,7 +41,9 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
         @JsonSubTypes.Type(name = "timestamp", value = TimestampTypeInfo.class),
         @JsonSubTypes.Type(name = "array", value = ArrayTypeInfo.class),
         @JsonSubTypes.Type(name = "map", value = MapTypeInfo.class),
-        @JsonSubTypes.Type(name = "row", value = RowTypeInfo.class)
+        @JsonSubTypes.Type(name = "row", value = RowTypeInfo.class),
+        @JsonSubTypes.Type(name = "binary", value = BinaryTypeInfo.class),
+        @JsonSubTypes.Type(name = "null", value = NullTypeInfo.class)
 })
 public interface TypeInfo extends Serializable {
 
diff --git a/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/BinaryFormatInfoTest.java b/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/BinaryFormatInfoTest.java
new file mode 100644
index 0000000..338ad71
--- /dev/null
+++ b/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/BinaryFormatInfoTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.common;
+
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class BinaryFormatInfoTest extends FormatInfoTestBase {
+
+    @Override
+    Collection<FormatInfo> createFormatInfos() {
+        return Collections.singletonList(BinaryFormatInfo.INSTANCE);
+    }
+
+    @Test
+    public void testSerialize() throws Exception {
+        assertEquals(
+                "testString",
+                BinaryFormatInfo.INSTANCE.serialize("testString".getBytes(StandardCharsets.UTF_8))
+        );
+    }
+
+    @Test
+    public void testDeserialize() throws Exception {
+        assertArrayEquals(
+                "testString".getBytes(StandardCharsets.UTF_8),
+                BinaryFormatInfo.INSTANCE.deserialize("testString")
+        );
+    }
+
+}
diff --git a/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/TimestampFormatInfoTest.java b/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/TimestampFormatInfoTest.java
index 8e764b9..97e002a 100644
--- a/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/TimestampFormatInfoTest.java
+++ b/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/TimestampFormatInfoTest.java
@@ -24,6 +24,7 @@ import java.sql.Timestamp;
 import java.text.ParseException;
 import java.util.Collection;
 import java.util.Collections;
+
 import org.junit.Test;
 
 /**
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml b/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
index e6f0d95..3a97cad 100644
--- a/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
@@ -106,6 +106,7 @@
                             <goal>shade</goal>
                         </goals>
                         <configuration>
+                            <createDependencyReducedPom>false</createDependencyReducedPom>
                             <relocations>
                                 <relocation>
                                     <pattern>com.google.common</pattern>
diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index c2d8115..cb8e925 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -45,6 +45,12 @@
             <groupId>org.apache.inlong</groupId>
             <artifactId>sort-connectors</artifactId>
             <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>
@@ -74,11 +80,23 @@
             <groupId>org.apache.iceberg</groupId>
             <artifactId>iceberg-flink</artifactId>
             <version>0.12.1</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>
@@ -91,6 +109,17 @@
             <artifactId>flink-json</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-avro</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
         <!-- test -->
         <dependency>
             <groupId>org.apache.kafka</groupId>
@@ -125,4 +154,43 @@
         </dependency>
     </dependencies>
 
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>3.2.4</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <createDependencyReducedPom>false</createDependencyReducedPom>
+                            <shadedArtifactAttached>false</shadedArtifactAttached>
+                            <finalName>${project.artifactId}-${project.version}</finalName>
+                            <filters>
+                                <!-- Globally exclude log4j.properties from our JAR files. -->
+                                <filter>
+                                    <artifact>*</artifact>
+                                    <excludes>
+                                        <exclude>log4j.properties</exclude>
+                                        <exclude>log4j-test.properties</exclude>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <transformers>
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
 </project>
\ No newline at end of file
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
index cfbd134..330cf72 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.sort.singletenant.flink;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.inlong.sort.singletenant.flink.kafka.KafkaSinkBuilder.buildKafkaSink;
+import static org.apache.inlong.sort.singletenant.flink.kafka.KafkaSinkBuilder.buildKafkaSinkStream;
 
 import com.google.common.base.Preconditions;
 import java.io.File;
@@ -159,11 +159,7 @@ public class Entrance {
                         .build();
                 break;
             case Constants.SINK_TYPE_KAFKA:
-                sourceStream
-                        .addSink(buildKafkaSink((KafkaSinkInfo) sinkInfo, properties, config))
-                        .uid(Constants.SINK_UID)
-                        .name("Kafka Sink")
-                        .setParallelism(sinkParallelism);
+                buildKafkaSinkStream(sourceStream, (KafkaSinkInfo) sinkInfo, properties, config, sinkParallelism);
                 break;
             default:
                 throw new IllegalArgumentException("Unsupported sink type " + sinkType);
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkBuilder.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkBuilder.java
index 01faf32..798a69f 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkBuilder.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkBuilder.java
@@ -18,31 +18,38 @@
 package org.apache.inlong.sort.singletenant.flink.kafka;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
 import org.apache.flink.types.Row;
 import org.apache.inlong.sort.configuration.Configuration;
-import org.apache.inlong.sort.singletenant.flink.serialization.RowSerializationSchemaFactory;
+import org.apache.inlong.sort.configuration.Constants;
+import org.apache.inlong.sort.protocol.serialization.CanalSerializationInfo;
+import org.apache.inlong.sort.protocol.serialization.SerializationInfo;
 import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo;
+import org.apache.inlong.sort.singletenant.flink.serialization.RowDataSerializationSchemaFactory;
+import org.apache.inlong.sort.singletenant.flink.serialization.RowSerializationSchemaFactory;
 import org.apache.kafka.clients.producer.ProducerConfig;
 
 import java.util.Map;
 import java.util.Properties;
 
 import static org.apache.inlong.sort.configuration.Constants.SINK_KAFKA_PRODUCER_POOL_SIZE;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.createRowConverter;
 
 public class KafkaSinkBuilder {
 
-    public static SinkFunction<Row> buildKafkaSink(
+    public static <T> SinkFunction<T> buildKafkaSink(
             KafkaSinkInfo kafkaSinkInfo,
             Map<String, Object> properties,
+            SerializationSchema<T> serializationSchema,
             Configuration config
     ) {
         String topic = kafkaSinkInfo.getTopic();
         Properties producerProperties = buildProducerProperties(properties, kafkaSinkInfo.getAddress());
-        SerializationSchema<Row> serializationSchema =
-                RowSerializationSchemaFactory.build(kafkaSinkInfo, kafkaSinkInfo.getSerializationInfo());
 
         return new FlinkKafkaProducer<>(
                 topic,
@@ -60,4 +67,39 @@ public class KafkaSinkBuilder {
         producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, address);
         return producerProperties;
     }
+
+    public static void buildKafkaSinkStream(
+            DataStream<Row> sourceStream,
+            KafkaSinkInfo kafkaSinkInfo,
+            Map<String, Object> properties,
+            Configuration config,
+            int sinkParallelism
+    ) {
+        SerializationInfo serializationInfo = kafkaSinkInfo.getSerializationInfo();
+        if (serializationInfo instanceof CanalSerializationInfo) {
+            DataFormatConverters.RowConverter rowConverter = createRowConverter(kafkaSinkInfo);
+            DataStream<RowData> dataStream = sourceStream
+                    .map(rowConverter::toInternal)
+                    .uid(Constants.CONVERTER_UID)
+                    .name("Row to RowData Converter")
+                    .setParallelism(sinkParallelism);
+
+            SerializationSchema<RowData> schema = RowDataSerializationSchemaFactory.build(
+                    kafkaSinkInfo.getFields(), kafkaSinkInfo.getSerializationInfo());
+
+            dataStream
+                    .addSink(buildKafkaSink(kafkaSinkInfo, properties, schema, config))
+                    .uid(Constants.SINK_UID)
+                    .name("Kafka Sink")
+                    .setParallelism(sinkParallelism);
+        } else {
+            SerializationSchema<Row> schema = RowSerializationSchemaFactory.build(
+                    kafkaSinkInfo.getFields(), kafkaSinkInfo.getSerializationInfo());
+            sourceStream
+                    .addSink(buildKafkaSink(kafkaSinkInfo, properties, schema, config))
+                    .uid(Constants.SINK_UID)
+                    .name("Kafka Sink")
+                    .setParallelism(sinkParallelism);
+        }
+    }
 }
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowDataSerializationSchemaFactory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowDataSerializationSchemaFactory.java
new file mode 100644
index 0000000..69d7378
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowDataSerializationSchemaFactory.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.serialization;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions;
+import org.apache.flink.formats.json.canal.CanalJsonSerializationSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.serialization.CanalSerializationInfo;
+import org.apache.inlong.sort.protocol.serialization.SerializationInfo;
+
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToRowType;
+
+public class RowDataSerializationSchemaFactory {
+
+    private static final String CANAL_TIMESTAMP_STANDARD_SQL = "SQL";
+    private static final String CANAL_TIMESTAMP_STANDARD_ISO = "ISO_8601";
+    private static final String CANAL_MAP_NULL_KEY_MODE_FAIL = "FAIL";
+    private static final String CANAL_MAP_NULL_KEY_MODE_DROP = "DROP";
+    private static final String CANAL_MAP_NULL_KEY_MODE_LITERAL = "LITERAL";
+
+    private static final String CANAL_MAP_NULL_KEY_LITERAL_DEFAULT = "null";
+
+    public static SerializationSchema<RowData> build(FieldInfo[] fieldInfos, SerializationInfo serializationInfo) {
+        if (serializationInfo instanceof CanalSerializationInfo) {
+            return buildCanalRowDataSerializationSchema(fieldInfos, (CanalSerializationInfo) serializationInfo);
+        }
+
+        throw new IllegalArgumentException("Unsupported RowData serialization info: " + serializationInfo);
+    }
+
+    private static SerializationSchema<RowData> buildCanalRowDataSerializationSchema(
+            FieldInfo[] fieldInfos,
+            CanalSerializationInfo canalSerializationInfo
+    ) {
+        String mapNullKeyLiteral = canalSerializationInfo.getMapNullKeyLiteral();
+        if (StringUtils.isEmpty(mapNullKeyLiteral)) {
+            mapNullKeyLiteral = CANAL_MAP_NULL_KEY_LITERAL_DEFAULT;
+        }
+
+        RowType rowType = convertFieldInfosToRowType(fieldInfos);
+        return new CanalJsonSerializationSchema(
+                rowType,
+                getTimestampFormatStandard(canalSerializationInfo.getTimestampFormatStandard()),
+                getMapNullKeyMode(canalSerializationInfo.getMapNullKeyMod()),
+                mapNullKeyLiteral,
+                canalSerializationInfo.isEncodeDecimalAsPlainNumber()
+        );
+    }
+
+    private static TimestampFormat getTimestampFormatStandard(String input) {
+        if (CANAL_TIMESTAMP_STANDARD_SQL.equals(input)) {
+            return TimestampFormat.SQL;
+        } else if (CANAL_TIMESTAMP_STANDARD_ISO.equals(input)) {
+            return TimestampFormat.ISO_8601;
+        }
+
+        throw new IllegalArgumentException("Unsupported timestamp format standard: " + input);
+    }
+
+    private static JsonOptions.MapNullKeyMode getMapNullKeyMode(String input) {
+        if (CANAL_MAP_NULL_KEY_MODE_FAIL.equals(input)) {
+            return JsonOptions.MapNullKeyMode.FAIL;
+        } else if (CANAL_MAP_NULL_KEY_MODE_DROP.equals(input)) {
+            return JsonOptions.MapNullKeyMode.DROP;
+        } else if (CANAL_MAP_NULL_KEY_MODE_LITERAL.equals(input)) {
+            return JsonOptions.MapNullKeyMode.LITERAL;
+        }
+
+        throw new IllegalArgumentException("Unsupported map null key mode: " + input);
+    }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowSerializationSchemaFactory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowSerializationSchemaFactory.java
index a03f037..b2eb961 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowSerializationSchemaFactory.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/RowSerializationSchemaFactory.java
@@ -19,22 +19,26 @@ package org.apache.inlong.sort.singletenant.flink.serialization;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.AvroRowSerializationSchema;
 import org.apache.flink.formats.json.JsonRowSerializationSchema;
 import org.apache.flink.types.Row;
 import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.serialization.AvroSerializationInfo;
 import org.apache.inlong.sort.protocol.serialization.JsonSerializationInfo;
 import org.apache.inlong.sort.protocol.serialization.SerializationInfo;
-import org.apache.inlong.sort.protocol.sink.SinkInfo;
 
 import java.nio.charset.StandardCharsets;
 
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.buildAvroRecordSchemaInJson;
 import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToRowTypeInfo;
 
 public class RowSerializationSchemaFactory {
 
-    public static SerializationSchema<Row> build(SinkInfo sinkInfo, SerializationInfo serializationInfo) {
+    public static SerializationSchema<Row> build(FieldInfo[] fieldInfos, SerializationInfo serializationInfo) {
         if (serializationInfo instanceof JsonSerializationInfo) {
-            return buildJsonRowSerializationSchema(sinkInfo.getFields());
+            return buildJsonRowSerializationSchema(fieldInfos);
+        } else if (serializationInfo instanceof AvroSerializationInfo) {
+            return buildAvroRowSerializationSchema(fieldInfos);
         } else {
             return buildStringRowSerializationSchema();
         }
@@ -46,14 +50,9 @@ public class RowSerializationSchemaFactory {
         return builder.withTypeInfo(rowTypeInfo).build();
     }
 
-    private static SerializationSchema<Row> buildCanalRowSerializationSchema(SerializationInfo serializationInfo) {
-        // TODO
-        return null;
-    }
-
-    private static SerializationSchema<Row> buildAvroRowSerializationSchema(SerializationInfo serializationInfo) {
-        // TODO
-        return null;
+    private static SerializationSchema<Row> buildAvroRowSerializationSchema(FieldInfo[] fieldInfos) {
+        String avroSchemaInJson = buildAvroRecordSchemaInJson(fieldInfos);
+        return new AvroRowSerializationSchema(avroSchemaInJson);
     }
 
     private static SerializationSchema<Row> buildStringRowSerializationSchema() {
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java
index b3be3ef..0ee5b7a 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java
@@ -18,14 +18,24 @@
 
 package org.apache.inlong.sort.singletenant.flink.utils;
 
+import org.apache.avro.Schema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.TableSchema.Builder;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.inlong.sort.formats.base.TableFormatUtils;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
 import org.apache.inlong.sort.formats.common.TypeInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.sink.SinkInfo;
 
+import static org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema;
+import static org.apache.inlong.sort.formats.base.TableFormatUtils.deriveLogicalType;
+
 public class CommonUtils {
 
     public static TableSchema getTableSchema(SinkInfo sinkInfo) {
@@ -57,4 +67,41 @@ public class CommonUtils {
 
         return new org.apache.flink.api.java.typeutils.RowTypeInfo(typeInformationArray, fieldNames);
     }
+
+    public static String buildAvroRecordSchemaInJson(FieldInfo[] fieldInfos) {
+        int fieldLength = fieldInfos.length;
+        String[] fieldNames = new String[fieldLength];
+        FormatInfo[] fieldFormatInfos = new FormatInfo[fieldLength];
+        for (int i = 0; i < fieldLength; i++) {
+            fieldNames[i] = fieldInfos[i].getName();
+            fieldFormatInfos[i] = fieldInfos[i].getFormatInfo();
+        }
+
+        RowFormatInfo rowFormatInfo = new RowFormatInfo(fieldNames, fieldFormatInfos);
+        LogicalType logicalType = deriveLogicalType(rowFormatInfo);
+        Schema schema = convertToSchema(logicalType);
+
+        if (schema.isUnion()) {
+            return schema.getTypes().get(1).toString();
+        }
+        return schema.toString();
+    }
+
+    public static DataFormatConverters.RowConverter createRowConverter(SinkInfo sinkInfo) {
+        DataType[] fieldDataTypes = getTableSchema(sinkInfo).getFieldDataTypes();
+        return new DataFormatConverters.RowConverter(fieldDataTypes);
+    }
+
+    public static RowType convertFieldInfosToRowType(FieldInfo[] fieldInfos) {
+        int fieldLength = fieldInfos.length;
+        String[] fieldNames = new String[fieldLength];
+        LogicalType[] fieldLogicalTypes = new LogicalType[fieldLength];
+        for (int i = 0; i < fieldLength; i++) {
+            fieldNames[i] = fieldInfos[i].getName();
+            fieldLogicalTypes[i] = TableFormatUtils.deriveLogicalType(fieldInfos[i].getFormatInfo());
+        }
+
+        return RowType.of(fieldLogicalTypes, fieldNames);
+    }
+
 }
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java
index e4ffcc8..9e21991 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java
@@ -21,14 +21,12 @@ package org.apache.inlong.sort.singletenant.flink.kafka;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 import org.apache.curator.test.TestingServer;
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.types.Row;
-import org.apache.inlong.sort.configuration.Configuration;
 import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.serialization.SerializationInfo;
-import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -36,7 +34,9 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.serialization.BytesDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.junit.After;
 import org.junit.Before;
@@ -52,7 +52,6 @@ import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
@@ -62,11 +61,10 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
-import static org.apache.inlong.sort.singletenant.flink.kafka.KafkaSinkBuilder.buildKafkaSink;
 import static org.apache.inlong.sort.singletenant.flink.utils.NetUtils.getUnusedLocalPort;
 import static org.junit.Assert.assertNull;
 
-public abstract class KafkaSinkTestBase {
+public abstract class KafkaSinkTestBase<T> {
 
     private static final Logger logger = LoggerFactory.getLogger(KafkaSinkTestBase.class);
 
@@ -78,16 +76,16 @@ public abstract class KafkaSinkTestBase {
     private TestingServer zkServer;
 
     private KafkaServer kafkaServer;
-    private String brokerConnStr;
     private AdminClient kafkaAdmin;
-    private KafkaConsumer<String, String> kafkaConsumer;
+    private KafkaConsumer<String, Bytes> kafkaConsumer;
     private Properties kafkaClientProperties;
+    protected String brokerConnStr;
 
     // prepare data below in subclass
     protected String topic;
     protected List<Row> testRows;
     protected FieldInfo[] fieldInfos;
-    protected SerializationInfo serializationInfo;
+    protected SerializationSchema<T> serializationSchema;
 
     @Before
     public void setup() throws Exception {
@@ -150,7 +148,7 @@ public abstract class KafkaSinkTestBase {
         kafkaClientProperties.setProperty("auto.offset.reset", "earliest");
         kafkaClientProperties.setProperty("max.poll.records", "1000");
         kafkaClientProperties.setProperty("key.deserializer", StringDeserializer.class.getName());
-        kafkaClientProperties.setProperty("value.deserializer", StringDeserializer.class.getName());
+        kafkaClientProperties.setProperty("value.deserializer", BytesDeserializer.class.getName());
     }
 
     private void addTopic() throws InterruptedException, TimeoutException, ExecutionException {
@@ -192,25 +190,19 @@ public abstract class KafkaSinkTestBase {
     }
 
     @Test(timeout = 3 * 60 * 1000)
-    public void testKafkaSink() throws InterruptedException {
+    public void testKafkaSink() throws Exception {
         TestingSource testingSource = createTestingSource();
         final ExecutorService executorService = Executors.newSingleThreadExecutor();
         CountDownLatch testFinishedCountDownLatch = new CountDownLatch(1);
         executorService.execute(() -> {
             StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-            env.addSource(testingSource).addSink(
-                    buildKafkaSink(
-                            new KafkaSinkInfo(fieldInfos, brokerConnStr, topic, serializationInfo),
-                            new HashMap<>(),
-                            new Configuration()
-                    )
-            );
 
             try {
+                buildJob(env, testingSource);
                 env.execute();
                 testFinishedCountDownLatch.await();
             } catch (Exception e) {
-                e.printStackTrace();
+                logger.error("Error occurred when executing flink test job: ", e);
             }
         });
 
@@ -219,13 +211,15 @@ public abstract class KafkaSinkTestBase {
         testFinishedCountDownLatch.countDown();
     }
 
-    private void verify() throws InterruptedException {
+    protected abstract void buildJob(StreamExecutionEnvironment env, TestingSource testingSource);
+
+    private void verify() throws Exception {
         kafkaConsumer.subscribe(Collections.singleton(topic));
-        List<String> results = new ArrayList<>();
+        List<Bytes> results = new ArrayList<>();
         while (true) {
-            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
+            ConsumerRecords<String, Bytes> records = kafkaConsumer.poll(Duration.ofSeconds(1));
             if (!records.isEmpty()) {
-                for (ConsumerRecord<String, String> record : records) {
+                for (ConsumerRecord<String, Bytes> record : records) {
                     assertNull(record.key());
                     results.add(record.value());
                 }
@@ -244,7 +238,7 @@ public abstract class KafkaSinkTestBase {
         }
     }
 
-    protected abstract void verifyData(List<String> results);
+    protected abstract void verifyData(List<Bytes> results) throws IOException;
 
     private TestingSource createTestingSource() {
         TestingSource testingSource = new TestingSource();
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBaseForRow.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBaseForRow.java
new file mode 100644
index 0000000..9a612c7
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBaseForRow.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.kafka;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo;
+
+import java.util.HashMap;
+
+import static org.apache.inlong.sort.singletenant.flink.kafka.KafkaSinkBuilder.buildKafkaSink;
+
+public abstract class KafkaSinkTestBaseForRow extends KafkaSinkTestBase<Row> {
+
+    @Override
+    protected void buildJob(StreamExecutionEnvironment env, TestingSource testingSource) {
+        env.addSource(testingSource).addSink(
+                buildKafkaSink(
+                        new KafkaSinkInfo(new FieldInfo[]{}, brokerConnStr, topic, null),
+                        new HashMap<>(),
+                        serializationSchema,
+                        new Configuration()
+                )
+        );
+    }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBaseForRowData.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBaseForRowData.java
new file mode 100644
index 0000000..bb17731
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBaseForRowData.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.kafka;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo;
+
+import java.util.HashMap;
+
+import static org.apache.inlong.sort.singletenant.flink.kafka.KafkaSinkBuilder.buildKafkaSink;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.createRowConverter;
+
+public abstract class KafkaSinkTestBaseForRowData extends KafkaSinkTestBase<RowData> {
+
+    @Override
+    protected void buildJob(StreamExecutionEnvironment env, TestingSource testingSource) {
+        KafkaSinkInfo kafkaSinkInfo = new KafkaSinkInfo(fieldInfos, brokerConnStr, topic, null);
+        DataFormatConverters.RowConverter rowConverter = createRowConverter(kafkaSinkInfo);
+        env.addSource(testingSource).map(rowConverter::toInternal).returns(RowData.class).addSink(
+                buildKafkaSink(kafkaSinkInfo, new HashMap<>(), serializationSchema, new Configuration())
+        );
+    }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToAvroKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToAvroKafkaSinkTest.java
new file mode 100644
index 0000000..82c7ee7
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToAvroKafkaSinkTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.kafka;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.formats.avro.AvroDeserializationSchema;
+import org.apache.flink.types.Row;
+import org.apache.inlong.sort.formats.common.ArrayFormatInfo;
+import org.apache.inlong.sort.formats.common.BinaryFormatInfo;
+import org.apache.inlong.sort.formats.common.DoubleFormatInfo;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.MapFormatInfo;
+import org.apache.inlong.sort.formats.common.NullFormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.serialization.AvroSerializationInfo;
+import org.apache.inlong.sort.singletenant.flink.serialization.RowSerializationSchemaFactory;
+import org.apache.kafka.common.utils.Bytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.buildAvroRecordSchemaInJson;
+import static org.junit.Assert.assertEquals;
+
+public class RowToAvroKafkaSinkTest extends KafkaSinkTestBaseForRow {
+
+    @Override
+    protected void prepareData() {
+        fieldInfos = new FieldInfo[]{
+                new FieldInfo("f1", new StringFormatInfo()),
+                new FieldInfo("f2", new IntFormatInfo()),
+                new FieldInfo("f3", new NullFormatInfo()),
+                new FieldInfo("f4", new BinaryFormatInfo()),
+                new FieldInfo("f5", new MapFormatInfo(
+                        new StringFormatInfo(),
+                        new RowFormatInfo(
+                                new String[]{"f51", "f52"},
+                                new FormatInfo[]{
+                                        new IntFormatInfo(),
+                                        new ArrayFormatInfo(new DoubleFormatInfo())
+                                }
+                        )
+                ))
+        };
+        topic = "test_kafka_row_to_avro";
+        serializationSchema = RowSerializationSchemaFactory.build(fieldInfos, new AvroSerializationInfo());
+
+        prepareTestRows();
+    }
+
+    private void prepareTestRows() {
+        testRows = new ArrayList<>();
+        Map<String, Row> map1 = new HashMap<>();
+        Double[] doubles1 = new Double[]{1.0, 2.0, 3.0};
+        map1.put("AnnaMap", Row.of(1, doubles1));
+        testRows.add(Row.of("Anna", 100, null, new byte[]{1, 2, 3}, map1));
+
+        Map<String, Row> map2 = new HashMap<>();
+        Double[] doubles2 = new Double[]{4.0, 5.0, 6.0};
+        map2.put("LisaMap", Row.of(2, doubles2));
+        testRows.add(Row.of("Lisa", 50, null, new byte[]{4, 5, 6}, map2));
+
+        Map<String, Row> map3 = new HashMap<>();
+        Double[] doubles3 = new Double[]{7.0, 8.0, 9.0};
+        map3.put("BobMap", Row.of(3, doubles3));
+        testRows.add(Row.of("Bob", 10, null, new byte[]{7, 8, 9}, map3));
+    }
+
+    @Override
+    protected void verifyData(List<Bytes> results) throws IOException {
+        AvroDeserializationSchema<GenericRecord> deserializationSchema = AvroDeserializationSchema.forGeneric(
+                new Schema.Parser().parse(buildAvroRecordSchemaInJson(fieldInfos)));
+
+        List<String> actualData = new ArrayList<>(testRows.size());
+        for (Bytes result : results) {
+            GenericRecord genericRecord = deserializationSchema.deserialize(result.get());
+            Row tempRow = new Row(5);
+            tempRow.setField(0, genericRecord.get("f1"));
+            tempRow.setField(1, genericRecord.get("f2"));
+            tempRow.setField(2, genericRecord.get("f3"));
+            ByteBuffer f3 = (ByteBuffer) genericRecord.get("f4");
+            tempRow.setField(3, f3.array());
+            Map<Utf8, GenericRecord> f5 = (Map<Utf8, GenericRecord>) genericRecord.get("f5");
+            Map<String, Row> tempMap = new HashMap<>();
+            for (Map.Entry<Utf8, GenericRecord> utf8GenericRecordEntry : f5.entrySet()) {
+                String key = new String(utf8GenericRecordEntry.getKey().getBytes());
+                GenericRecord value = utf8GenericRecordEntry.getValue();
+                tempMap.put(key, Row.of(value.get("f51"), value.get("f52")));
+            }
+            tempRow.setField(4, tempMap);
+            actualData.add(tempRow.toString());
+        }
+        actualData.sort(String::compareTo);
+
+        List<String> expectedData = new ArrayList<>(testRows.size());
+        testRows.forEach(row -> expectedData.add(row.toString()));
+        expectedData.sort(String::compareTo);
+
+        assertEquals(expectedData, actualData);
+    }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToCanalKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToCanalKafkaSinkTest.java
new file mode 100644
index 0000000..8a56347
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToCanalKafkaSinkTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.kafka;
+
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.serialization.CanalSerializationInfo;
+import org.apache.inlong.sort.singletenant.flink.serialization.RowDataSerializationSchemaFactory;
+import org.apache.kafka.common.utils.Bytes;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class RowToCanalKafkaSinkTest extends KafkaSinkTestBaseForRowData {
+
+    @Override
+    protected void prepareData() {
+        topic = "test_kafka_row_to_canal";
+        fieldInfos = new FieldInfo[]{
+                new FieldInfo("f1", new StringFormatInfo()),
+                new FieldInfo("f2", new IntFormatInfo())
+        };
+
+        serializationSchema = RowDataSerializationSchemaFactory.build(
+                fieldInfos, new CanalSerializationInfo("sql", "literal", "null", true)
+        );
+
+        prepareTestData();
+    }
+
+    private void prepareTestData() {
+        testRows = new ArrayList<>();
+        Row row1 = Row.of("Anna", 100);
+        row1.setKind(RowKind.INSERT);
+        testRows.add(row1);
+
+        Row row2 = Row.of("Lisa", 90);
+        row2.setKind(RowKind.DELETE);
+        testRows.add(row2);
+
+        Row row3 = Row.of("Bob", 80);
+        row3.setKind(RowKind.UPDATE_BEFORE);
+        testRows.add(row3);
+
+        Row row4 = Row.of("Tom", 70);
+        row4.setKind(RowKind.UPDATE_AFTER);
+        testRows.add(row4);
+    }
+
+    @Override
+    protected void verifyData(List<Bytes> results) {
+        List<String> actualData = new ArrayList<>();
+        results.forEach(value -> actualData.add(new String(value.get())));
+        actualData.sort(String::compareTo);
+
+        List<String> expectedData = new ArrayList<>();
+        expectedData.add("{\"data\":[{\"f1\":\"Bob\",\"f2\":80}],\"type\":\"DELETE\"}");
+        expectedData.add("{\"data\":[{\"f1\":\"Tom\",\"f2\":70}],\"type\":\"INSERT\"}");
+        expectedData.add("{\"data\":[{\"f1\":\"Lisa\",\"f2\":90}],\"type\":\"DELETE\"}");
+        expectedData.add("{\"data\":[{\"f1\":\"Anna\",\"f2\":100}],\"type\":\"INSERT\"}");
+        expectedData.sort(String::compareTo);
+
+        assertEquals(expectedData, actualData);
+    }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java
index 7bade20..7239dc7 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java
@@ -26,7 +26,10 @@ import org.apache.inlong.sort.formats.common.MapFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.serialization.JsonSerializationInfo;
+import org.apache.inlong.sort.singletenant.flink.serialization.RowSerializationSchemaFactory;
+import org.apache.kafka.common.utils.Bytes;
 
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -34,22 +37,20 @@ import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 
-public class RowToJsonKafkaSinkTest extends KafkaSinkTestBase {
+public class RowToJsonKafkaSinkTest extends KafkaSinkTestBaseForRow {
     @Override
     protected void prepareData() {
         topic = "test_kafka_row_to_json";
-        prepareKafkaSinkInfo();
-        prepareTestRows();
-    }
+        serializationSchema = RowSerializationSchemaFactory.build(
+                new FieldInfo[]{
+                        new FieldInfo("f1", new StringFormatInfo()),
+                        new FieldInfo("f2", new MapFormatInfo(new StringFormatInfo(), new DoubleFormatInfo())),
+                        new FieldInfo("f3", new ArrayFormatInfo(new IntFormatInfo()))
+                },
+                new JsonSerializationInfo()
+        );
 
-    private void prepareKafkaSinkInfo() {
-        fieldInfos = new FieldInfo[]{
-                new FieldInfo("f1", new StringFormatInfo()),
-                new FieldInfo("f2", new MapFormatInfo(new StringFormatInfo(), new DoubleFormatInfo())),
-                new FieldInfo("f3", new ArrayFormatInfo(new IntFormatInfo()))
-        };
-
-        serializationInfo = new JsonSerializationInfo();
+        prepareTestRows();
     }
 
     private void prepareTestRows() {
@@ -69,14 +70,17 @@ public class RowToJsonKafkaSinkTest extends KafkaSinkTestBase {
     }
 
     @Override
-    protected void verifyData(List<String> results) {
+    protected void verifyData(List<Bytes> results) {
+        List<String> actualData = new ArrayList<>();
+        results.forEach(value -> actualData.add(new String(value.get(), StandardCharsets.UTF_8)));
+        actualData.sort(String::compareTo);
+
         List<String> expectedData = new ArrayList<>();
         expectedData.add("{\"f1\":\"zhangsan\",\"f2\":{\"high\":170.5},\"f3\":[123]}");
         expectedData.add("{\"f1\":\"lisi\",\"f2\":{\"high\":180.5},\"f3\":[1234]}");
         expectedData.add("{\"f1\":\"wangwu\",\"f2\":{\"high\":190.5},\"f3\":[12345]}");
-
-        results.sort(String::compareTo);
         expectedData.sort(String::compareTo);
-        assertEquals(expectedData, results);
+
+        assertEquals(expectedData, actualData);
     }
 }
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java
index fc10f1f..dbab9b6 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java
@@ -22,30 +22,28 @@ import org.apache.flink.types.Row;
 import org.apache.inlong.sort.formats.common.DoubleFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.singletenant.flink.serialization.RowSerializationSchemaFactory;
+import org.apache.kafka.common.utils.Bytes;
 
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 
-public class RowToStringKafkaSinkTest extends KafkaSinkTestBase {
+public class RowToStringKafkaSinkTest extends KafkaSinkTestBaseForRow {
 
     @Override
     protected void prepareData() {
         topic = "test_kafka_row_to_string";
-        prepareKafkaSinkInfo();
-        prepareTestRows();
-    }
-
-    private void prepareKafkaSinkInfo() {
-        fieldInfos = new FieldInfo[]{
-                new FieldInfo("f1", new StringFormatInfo()),
-                new FieldInfo("f2", new DoubleFormatInfo())
-        };
-        serializationInfo = null;
-    }
+        serializationSchema = RowSerializationSchemaFactory.build(
+                new FieldInfo[]{
+                        new FieldInfo("f1", new StringFormatInfo()),
+                        new FieldInfo("f2", new DoubleFormatInfo())
+                },
+                null
+        );
 
-    private void prepareTestRows() {
         testRows = new ArrayList<>();
         testRows.add(Row.of("f1", 12.0));
         testRows.add(Row.of("f2", 12.1));
@@ -53,13 +51,15 @@ public class RowToStringKafkaSinkTest extends KafkaSinkTestBase {
     }
 
     @Override
-    protected void verifyData(List<String> results) {
+    protected void verifyData(List<Bytes> results) {
+        List<String> actualData = new ArrayList<>();
+        results.forEach(value -> actualData.add(new String(value.get(), StandardCharsets.UTF_8)));
+        actualData.sort(String::compareTo);
+
         List<String> expectedData = new ArrayList<>();
         testRows.forEach(row -> expectedData.add(row.toString()));
-
-        results.sort(String::compareTo);
         expectedData.sort(String::compareTo);
-        assertEquals(expectedData, results);
 
+        assertEquals(expectedData, actualData);
     }
 }
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtilsTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtilsTest.java
index 0547202..c2894e9 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtilsTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtilsTest.java
@@ -20,11 +20,23 @@ package org.apache.inlong.sort.singletenant.flink.utils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.formats.common.ArrayFormatInfo;
 import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
+import org.apache.inlong.sort.formats.common.ByteFormatInfo;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.MapFormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.common.ShortFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.junit.Test;
 
+import java.io.IOException;
+
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.buildAvroRecordSchemaInJson;
 import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToRowTypeInfo;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -43,4 +55,245 @@ public class CommonUtilsTest {
         assertEquals(Types.STRING, fieldTypesFlink[0]);
         assertEquals(Types.BOOLEAN, fieldTypesFlink[1]);
     }
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    @Test
+    public void testBuildAvroRecordSchemaInJsonForNormalFields() throws IOException {
+        FieldInfo[] testFieldInfos = new FieldInfo[]{
+                new FieldInfo("f1", new StringFormatInfo()),
+                new FieldInfo("f2", new ByteFormatInfo())
+        };
+
+        JsonNode expectedJsonNode = objectMapper.readTree("{\n"
+                + "    \"type\":\"record\",\n"
+                + "    \"name\":\"record\",\n"
+                + "    \"fields\":[\n"
+                + "        {\n"
+                + "            \"name\":\"f1\",\n"
+                + "            \"type\":[\n"
+                + "                \"null\",\n"
+                + "                \"string\"\n"
+                + "            ],\n"
+                + "            \"default\":null\n"
+                + "        },\n"
+                + "        {\n"
+                + "            \"name\":\"f2\",\n"
+                + "            \"type\":[\n"
+                + "                \"null\",\n"
+                + "                \"int\"\n"
+                + "            ],\n"
+                + "            \"default\":null\n"
+                + "        }\n"
+                + "    ]\n"
+                + "}");
+
+        String actualJson = buildAvroRecordSchemaInJson(testFieldInfos);
+        JsonNode actualJsonNode = objectMapper.readTree(actualJson);
+
+        assertEquals(expectedJsonNode, actualJsonNode);
+    }
+
+    @Test
+    public void testBuildAvroRecordSchemaInJsonForRecursiveFields() throws IOException {
+        FieldInfo[] testFieldInfos = new FieldInfo[]{
+                new FieldInfo("f1", new ArrayFormatInfo(
+                        new MapFormatInfo(
+                                new StringFormatInfo(),
+                                new ArrayFormatInfo(new ArrayFormatInfo(new ShortFormatInfo()))
+                        )
+                )),
+                new FieldInfo("f2", new MapFormatInfo(
+                        new StringFormatInfo(),
+                        new MapFormatInfo(
+                                new StringFormatInfo(),
+                                new RowFormatInfo(
+                                        new String[]{"f21", "f22"},
+                                        new FormatInfo[]{new IntFormatInfo(), new ArrayFormatInfo(new ByteFormatInfo())}
+                                )
+                        )
+                )),
+                new FieldInfo("f3", new RowFormatInfo(
+                        new String[]{"f31", "f32"},
+                        new FormatInfo[]{
+                                new ArrayFormatInfo(new StringFormatInfo()),
+                                new RowFormatInfo(
+                                        new String[]{"f321", "f322"},
+                                        new FormatInfo[]{
+                                                new ArrayFormatInfo(new IntFormatInfo()),
+                                                new MapFormatInfo(
+                                                        new StringFormatInfo(),
+                                                        new ArrayFormatInfo(new ByteFormatInfo())
+                                                )
+                                        }
+                                )
+                        }
+                ))
+        };
+
+        JsonNode expectedJsonNode = objectMapper.readTree("{\n"
+                + "    \"type\":\"record\",\n"
+                + "    \"name\":\"record\",\n"
+                + "    \"fields\":[\n"
+                + "        {\n"
+                + "            \"name\":\"f1\",\n"
+                + "            \"type\":[\n"
+                + "                \"null\",\n"
+                + "                {\n"
+                + "                    \"type\":\"array\",\n"
+                + "                    \"items\":[\n"
+                + "                        \"null\",\n"
+                + "                        {\n"
+                + "                            \"type\":\"map\",\n"
+                + "                            \"values\":[\n"
+                + "                                \"null\",\n"
+                + "                                {\n"
+                + "                                    \"type\":\"array\",\n"
+                + "                                    \"items\":[\n"
+                + "                                        \"null\",\n"
+                + "                                        {\n"
+                + "                                            \"type\":\"array\",\n"
+                + "                                            \"items\":[\n"
+                + "                                                \"null\",\n"
+                + "                                                \"int\"\n"
+                + "                                            ]\n"
+                + "                                        }\n"
+                + "                                    ]\n"
+                + "                                }\n"
+                + "                            ]\n"
+                + "                        }\n"
+                + "                    ]\n"
+                + "                }\n"
+                + "            ],\n"
+                + "            \"default\":null\n"
+                + "        },\n"
+                + "        {\n"
+                + "            \"name\":\"f2\",\n"
+                + "            \"type\":[\n"
+                + "                \"null\",\n"
+                + "                {\n"
+                + "                    \"type\":\"map\",\n"
+                + "                    \"values\":[\n"
+                + "                        \"null\",\n"
+                + "                        {\n"
+                + "                            \"type\":\"map\",\n"
+                + "                            \"values\":[\n"
+                + "                                \"null\",\n"
+                + "                                {\n"
+                + "                                    \"type\":\"record\",\n"
+                + "                                    \"name\":\"record_f2\",\n"
+                + "                                    \"fields\":[\n"
+                + "                                        {\n"
+                + "                                            \"name\":\"f21\",\n"
+                + "                                            \"type\":[\n"
+                + "                                                \"null\",\n"
+                + "                                                \"int\"\n"
+                + "                                            ],\n"
+                + "                                            \"default\":null\n"
+                + "                                        },\n"
+                + "                                        {\n"
+                + "                                            \"name\":\"f22\",\n"
+                + "                                            \"type\":[\n"
+                + "                                                \"null\",\n"
+                + "                                                {\n"
+                + "                                                    \"type\":\"array\",\n"
+                + "                                                    \"items\":[\n"
+                + "                                                        \"null\",\n"
+                + "                                                        \"int\"\n"
+                + "                                                    ]\n"
+                + "                                                }\n"
+                + "                                            ],\n"
+                + "                                            \"default\":null\n"
+                + "                                        }\n"
+                + "                                    ]\n"
+                + "                                }\n"
+                + "                            ]\n"
+                + "                        }\n"
+                + "                    ]\n"
+                + "                }\n"
+                + "            ],\n"
+                + "            \"default\":null\n"
+                + "        },\n"
+                + "        {\n"
+                + "            \"name\":\"f3\",\n"
+                + "            \"type\":[\n"
+                + "                \"null\",\n"
+                + "                {\n"
+                + "                    \"type\":\"record\",\n"
+                + "                    \"name\":\"record_f3\",\n"
+                + "                    \"fields\":[\n"
+                + "                        {\n"
+                + "                            \"name\":\"f31\",\n"
+                + "                            \"type\":[\n"
+                + "                                \"null\",\n"
+                + "                                {\n"
+                + "                                    \"type\":\"array\",\n"
+                + "                                    \"items\":[\n"
+                + "                                        \"null\",\n"
+                + "                                        \"string\"\n"
+                + "                                    ]\n"
+                + "                                }\n"
+                + "                            ],\n"
+                + "                            \"default\":null\n"
+                + "                        },\n"
+                + "                        {\n"
+                + "                            \"name\":\"f32\",\n"
+                + "                            \"type\":[\n"
+                + "                                \"null\",\n"
+                + "                                {\n"
+                + "                                    \"type\":\"record\",\n"
+                + "                                    \"name\":\"record_f3_f32\",\n"
+                + "                                    \"fields\":[\n"
+                + "                                        {\n"
+                + "                                            \"name\":\"f321\",\n"
+                + "                                            \"type\":[\n"
+                + "                                                \"null\",\n"
+                + "                                                {\n"
+                + "                                                    \"type\":\"array\",\n"
+                + "                                                    \"items\":[\n"
+                + "                                                        \"null\",\n"
+                + "                                                        \"int\"\n"
+                + "                                                    ]\n"
+                + "                                                }\n"
+                + "                                            ],\n"
+                + "                                            \"default\":null\n"
+                + "                                        },\n"
+                + "                                        {\n"
+                + "                                            \"name\":\"f322\",\n"
+                + "                                            \"type\":[\n"
+                + "                                                \"null\",\n"
+                + "                                                {\n"
+                + "                                                    \"type\":\"map\",\n"
+                + "                                                    \"values\":[\n"
+                + "                                                        \"null\",\n"
+                + "                                                        {\n"
+                + "                                                            \"type\":\"array\",\n"
+                + "                                                            \"items\":[\n"
+                + "                                                                \"null\",\n"
+                + "                                                                \"int\"\n"
+                + "                                                            ]\n"
+                + "                                                        }\n"
+                + "                                                    ]\n"
+                + "                                                }\n"
+                + "                                            ],\n"
+                + "                                            \"default\":null\n"
+                + "                                        }\n"
+                + "                                    ]\n"
+                + "                                }\n"
+                + "                            ],\n"
+                + "                            \"default\":null\n"
+                + "                        }\n"
+                + "                    ]\n"
+                + "                }\n"
+                + "            ],\n"
+                + "            \"default\":null\n"
+                + "        }\n"
+                + "    ]\n"
+                + "}");
+
+        String actualJson = buildAvroRecordSchemaInJson(testFieldInfos);
+        JsonNode actualJsonNode = objectMapper.readTree(actualJson);
+
+        assertEquals(expectedJsonNode, actualJsonNode);
+    }
 }