You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/10/09 09:50:08 UTC

[inlong] branch master updated: [INLONG-6078][Manager][Sort] Add VarBinary data type and extend binary for supporting length (#6079)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 22b2287f0 [INLONG-6078][Manager][Sort] Add VarBinary data type and extend binary for supporting length (#6079)
22b2287f0 is described below

commit 22b2287f0d0d23cbf061a3aa1b431be0ad42793b
Author: Xin Gong <ge...@gmail.com>
AuthorDate: Sun Oct 9 17:50:02 2022 +0800

    [INLONG-6078][Manager][Sort] Add VarBinary data type and extend binary for supporting length (#6079)
---
 .../inlong/manager/common/enums/FieldType.java     |   1 +
 .../manager/pojo/fieldformat/BinaryFormat.java     |  32 ++++++
 .../manager/pojo/fieldformat/VarBinaryFormat.java  |  32 ++++++
 .../manager/pojo/sort/util/FieldFormatUtils.java   |  18 ++++
 .../manager/pojo/sort/util/FieldInfoUtils.java     |  20 +++-
 .../pojo/sort/util/FieldFormatUtilsTest.java       |  16 +++
 .../inlong/sort/formats/base/TableFormatUtils.java |  10 +-
 .../sort/formats/common/BinaryFormatInfo.java      |  34 +++++++
 .../inlong/sort/formats/common/FormatInfo.java     |   3 +-
 .../sort/formats/common/VarBinaryFormatInfo.java   | 112 +++++++++++++++++++++
 .../sort/formats/common/VarBinaryTypeInfo.java     |  49 +++++++++
 .../formats/common/VarBinaryFormatInfoTest.java}   |  52 ++++------
 12 files changed, 344 insertions(+), 35 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
index cd54828b4..2d95846d6 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
@@ -34,6 +34,7 @@ public enum FieldType {
     FIXED,
     BYTE,
     BINARY,
+    VARBINARY,
     BOOLEAN,
     DATE,
     TIME,
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/fieldformat/BinaryFormat.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/fieldformat/BinaryFormat.java
new file mode 100644
index 000000000..4a0f4dc85
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/fieldformat/BinaryFormat.java
@@ -0,0 +1,32 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.fieldformat;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class BinaryFormat {
+
+    private Integer length;
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/fieldformat/VarBinaryFormat.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/fieldformat/VarBinaryFormat.java
new file mode 100644
index 000000000..2fc5ffac2
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/fieldformat/VarBinaryFormat.java
@@ -0,0 +1,32 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.fieldformat;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class VarBinaryFormat {
+
+    private Integer length;
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldFormatUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldFormatUtils.java
index a1262a1e0..f3077be28 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldFormatUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldFormatUtils.java
@@ -20,15 +20,33 @@ package org.apache.inlong.manager.pojo.sort.util;
 import org.apache.inlong.manager.common.enums.FieldType;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.pojo.fieldformat.ArrayFormat;
+import org.apache.inlong.manager.pojo.fieldformat.BinaryFormat;
 import org.apache.inlong.manager.pojo.fieldformat.DecimalFormat;
 import org.apache.inlong.manager.pojo.fieldformat.MapFormat;
 import org.apache.inlong.manager.pojo.fieldformat.StructFormat;
 import org.apache.inlong.manager.pojo.fieldformat.StructFormat.Element;
+import org.apache.inlong.manager.pojo.fieldformat.VarBinaryFormat;
 
 import java.util.List;
 
 public class FieldFormatUtils {
 
+    public static String createBinaryFormat(int length) {
+        return JsonUtils.toJsonString(new BinaryFormat(length));
+    }
+
+    public static BinaryFormat parseBinaryFormat(String formatJson) {
+        return JsonUtils.parseObject(formatJson, BinaryFormat.class);
+    }
+
+    public static String createVarBinaryFormat(int length) {
+        return JsonUtils.toJsonString(new VarBinaryFormat(length));
+    }
+
+    public static VarBinaryFormat parseVarBinaryFormat(String formatJson) {
+        return JsonUtils.parseObject(formatJson, VarBinaryFormat.class);
+    }
+
     public static String createDecimalFormat(int precision, int scale) {
         return JsonUtils.toJsonString(new DecimalFormat(precision, scale));
     }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java
index 7c4246485..d1c983058 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java
@@ -23,16 +23,18 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.enums.MetaField;
 import org.apache.inlong.manager.common.enums.FieldType;
 import org.apache.inlong.manager.pojo.fieldformat.ArrayFormat;
+import org.apache.inlong.manager.pojo.fieldformat.BinaryFormat;
 import org.apache.inlong.manager.pojo.fieldformat.DecimalFormat;
 import org.apache.inlong.manager.pojo.fieldformat.MapFormat;
 import org.apache.inlong.manager.pojo.fieldformat.StructFormat;
 import org.apache.inlong.manager.pojo.fieldformat.StructFormat.Element;
+import org.apache.inlong.manager.pojo.fieldformat.VarBinaryFormat;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.sort.formats.common.ArrayFormatInfo;
+import org.apache.inlong.sort.formats.common.BinaryFormatInfo;
 import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
 import org.apache.inlong.sort.formats.common.ByteFormatInfo;
-import org.apache.inlong.sort.formats.common.ByteTypeInfo;
 import org.apache.inlong.sort.formats.common.DateFormatInfo;
 import org.apache.inlong.sort.formats.common.DecimalFormatInfo;
 import org.apache.inlong.sort.formats.common.DoubleFormatInfo;
@@ -47,6 +49,7 @@ import org.apache.inlong.sort.formats.common.ShortFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.formats.common.TimeFormatInfo;
 import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.formats.common.VarBinaryFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.MetaFieldInfo;
 
@@ -240,7 +243,20 @@ public class FieldInfoUtils {
                 break;
             case BINARY:
             case FIXED:
-                formatInfo = new ArrayFormatInfo(ByteTypeInfo::new);
+                if (StringUtils.isNotBlank(format)) {
+                    BinaryFormat binaryFormat = FieldFormatUtils.parseBinaryFormat(format);
+                    formatInfo = new BinaryFormatInfo(binaryFormat.getLength());
+                } else {
+                    formatInfo = new BinaryFormatInfo();
+                }
+                break;
+            case VARBINARY:
+                if (StringUtils.isNotBlank(format)) {
+                    VarBinaryFormat varBinaryFormat = FieldFormatUtils.parseVarBinaryFormat(format);
+                    formatInfo = new VarBinaryFormatInfo(varBinaryFormat.getLength());
+                } else {
+                    formatInfo = new VarBinaryFormatInfo();
+                }
                 break;
             case ARRAY:
                 formatInfo = createArrayFormatInfo(format);
diff --git a/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sort/util/FieldFormatUtilsTest.java b/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sort/util/FieldFormatUtilsTest.java
index 199b4f5c7..cdc52d0b0 100644
--- a/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sort/util/FieldFormatUtilsTest.java
+++ b/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sort/util/FieldFormatUtilsTest.java
@@ -20,10 +20,12 @@ package org.apache.inlong.manager.pojo.sort.util;
 import com.google.common.collect.Lists;
 import org.apache.inlong.manager.common.enums.FieldType;
 import org.apache.inlong.manager.pojo.fieldformat.ArrayFormat;
+import org.apache.inlong.manager.pojo.fieldformat.BinaryFormat;
 import org.apache.inlong.manager.pojo.fieldformat.DecimalFormat;
 import org.apache.inlong.manager.pojo.fieldformat.MapFormat;
 import org.apache.inlong.manager.pojo.fieldformat.StructFormat;
 import org.apache.inlong.manager.pojo.fieldformat.StructFormat.Element;
+import org.apache.inlong.manager.pojo.fieldformat.VarBinaryFormat;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -80,4 +82,18 @@ public class FieldFormatUtilsTest {
         Assertions.assertTrue(format.getElements().get(0).getFieldName().equals("id"));
         Assertions.assertTrue(format.getElements().get(0).getFieldType() == FieldType.INT);
     }
+
+    @Test
+    public void testBinaryFormat() {
+        String formatJson = FieldFormatUtils.createBinaryFormat(5);
+        BinaryFormat format = FieldFormatUtils.parseBinaryFormat(formatJson);
+        Assertions.assertEquals(5, (int) format.getLength());
+    }
+
+    @Test
+    public void testVarBinaryFormat() {
+        String formatJson = FieldFormatUtils.createVarBinaryFormat(5);
+        VarBinaryFormat format = FieldFormatUtils.parseVarBinaryFormat(formatJson);
+        Assertions.assertEquals(5, (int) format.getLength());
+    }
 }
diff --git a/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java b/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
index 7136191a0..932252270 100644
--- a/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
+++ b/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
@@ -47,6 +47,7 @@ import org.apache.flink.table.types.logical.SmallIntType;
 import org.apache.flink.table.types.logical.TimeType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.types.Row;
 import org.apache.inlong.sort.formats.common.ArrayFormatInfo;
@@ -88,6 +89,7 @@ import org.apache.inlong.sort.formats.common.TimeTypeInfo;
 import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
 import org.apache.inlong.sort.formats.common.TimestampTypeInfo;
 import org.apache.inlong.sort.formats.common.TypeInfo;
+import org.apache.inlong.sort.formats.common.VarBinaryFormatInfo;
 import org.apache.inlong.sort.formats.common.VarCharFormatInfo;
 
 import java.util.Arrays;
@@ -313,6 +315,8 @@ public class TableFormatUtils {
             return new RowFormatInfo(fieldNames, fieldFormatInfos);
         } else if (logicalType instanceof BinaryType) {
             return BinaryFormatInfo.INSTANCE;
+        } else if (logicalType instanceof VarBinaryType) {
+            return VarBinaryFormatInfo.INSTANCE;
         } else if (logicalType instanceof NullType) {
             return NullFormatInfo.INSTANCE;
         } else {
@@ -373,7 +377,11 @@ public class TableFormatUtils {
             }
             return RowType.of(logicalTypes, rowFormatInfo.getFieldNames());
         } else if (formatInfo instanceof BinaryFormatInfo) {
-            return new BinaryType();
+            BinaryFormatInfo binaryFormatInfo = (BinaryFormatInfo) formatInfo;
+            return new BinaryType(binaryFormatInfo.getLength());
+        } else if (formatInfo instanceof VarBinaryFormatInfo) {
+            VarBinaryFormatInfo varBinaryFormatInfo = (VarBinaryFormatInfo) formatInfo;
+            return new VarBinaryType(varBinaryFormatInfo.getLength());
         } else if (formatInfo instanceof NullFormatInfo) {
             return new NullType();
         } else {
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/BinaryFormatInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/BinaryFormatInfo.java
index b8e577409..2092f70d0 100644
--- a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/BinaryFormatInfo.java
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/BinaryFormatInfo.java
@@ -18,6 +18,9 @@
 
 package org.apache.inlong.sort.formats.common;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import java.nio.charset.StandardCharsets;
 
 public class BinaryFormatInfo implements BasicFormatInfo<byte[]> {
@@ -26,6 +29,30 @@ public class BinaryFormatInfo implements BasicFormatInfo<byte[]> {
 
     public static final BinaryFormatInfo INSTANCE = new BinaryFormatInfo();
 
+    public static final int MIN_LENGTH = 1;
+
+    public static final int MAX_LENGTH = Integer.MAX_VALUE;
+
+    public static final int DEFAULT_LENGTH = 1;
+
+    @JsonProperty("length")
+    private int length;
+
+    public BinaryFormatInfo() {
+        this(DEFAULT_LENGTH);
+    }
+
+    @JsonCreator
+    public BinaryFormatInfo(@JsonProperty("length") int length) {
+        if (length < MIN_LENGTH) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Binary string length must be between %d and %d (both inclusive).",
+                            MIN_LENGTH, MAX_LENGTH));
+        }
+        this.length = length;
+    }
+
     @Override
     public String serialize(byte[] record) throws Exception {
         return new String(record, StandardCharsets.UTF_8);
@@ -60,4 +87,11 @@ public class BinaryFormatInfo implements BasicFormatInfo<byte[]> {
         return "BinaryFormatInfo";
     }
 
+    public int getLength() {
+        return length;
+    }
+
+    public void setLength(int length) {
+        this.length = length;
+    }
 }
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/FormatInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/FormatInfo.java
index 45931cf4f..549e991f0 100644
--- a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/FormatInfo.java
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/FormatInfo.java
@@ -47,7 +47,8 @@ import java.io.Serializable;
         @JsonSubTypes.Type(name = "row", value = RowFormatInfo.class),
         @JsonSubTypes.Type(name = "binary", value = BinaryFormatInfo.class),
         @JsonSubTypes.Type(name = "null", value = NullFormatInfo.class),
-        @JsonSubTypes.Type(name = "local_zoned_timestamp", value = LocalZonedTimestampFormatInfo.class)
+        @JsonSubTypes.Type(name = "local_zoned_timestamp", value = LocalZonedTimestampFormatInfo.class),
+        @JsonSubTypes.Type(name = "varbinary", value = VarBinaryFormatInfo.class)
 })
 public interface FormatInfo extends Serializable {
 
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/VarBinaryFormatInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/VarBinaryFormatInfo.java
new file mode 100644
index 000000000..0ed1a4dc0
--- /dev/null
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/VarBinaryFormatInfo.java
@@ -0,0 +1,112 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.common;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * The format information for VarBinary.
+ */
+public class VarBinaryFormatInfo implements BasicFormatInfo<byte[]> {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final VarBinaryFormatInfo INSTANCE = new VarBinaryFormatInfo();
+
+    public static final int MIN_LENGTH = 1;
+
+    public static final int MAX_LENGTH = Integer.MAX_VALUE;
+
+    public static final int DEFAULT_LENGTH = Integer.MAX_VALUE;
+
+    @JsonProperty("length")
+    private int length;
+
+    public VarBinaryFormatInfo() {
+        this(DEFAULT_LENGTH);
+    }
+
+    @JsonCreator
+    public VarBinaryFormatInfo(@JsonProperty("length") int length) {
+        if (length < MIN_LENGTH) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "VarBinary string length must be between %d and %d (both inclusive).",
+                            MIN_LENGTH, MAX_LENGTH));
+        }
+        this.length = length;
+    }
+
+    /**
+     * Serializes the given record.
+     *
+     * @param record The record to be serialized.
+     * @return The text serialized from the given record.
+     */
+    @Override
+    public String serialize(byte[] record) throws Exception {
+        return new String(record, StandardCharsets.UTF_8);
+    }
+
+    /**
+     * Deserializes the given text.
+     *
+     * @param text The text to be deserialized.
+     * @return The record deserialized from the given text.
+     */
+    @Override
+    public byte[] deserialize(String text) throws Exception {
+        return text.getBytes(StandardCharsets.UTF_8);
+    }
+
+    /**
+     * Returns the type represented by this format.
+     *
+     * @return the type represented by this format.
+     */
+    @Override
+    public TypeInfo getTypeInfo() {
+        return VarBinaryTypeInfo.INSTANCE;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        return o != null && getClass() == o.getClass();
+    }
+
+    @Override
+    public int hashCode() {
+        return getClass().hashCode();
+    }
+
+    public int getLength() {
+        return length;
+    }
+
+    public void setLength(int length) {
+        this.length = length;
+    }
+}
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/VarBinaryTypeInfo.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/VarBinaryTypeInfo.java
new file mode 100644
index 000000000..9ba5efc81
--- /dev/null
+++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/VarBinaryTypeInfo.java
@@ -0,0 +1,49 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.common;
+
+/**
+ * The type information for VarBinary.
+ */
+public class VarBinaryTypeInfo implements TypeInfo {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final VarBinaryTypeInfo INSTANCE = new VarBinaryTypeInfo();
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        return o != null && getClass() == o.getClass();
+    }
+
+    @Override
+    public int hashCode() {
+        return getClass().hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "VarBinaryTypeInfo";
+    }
+
+}
diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/BinaryFormatInfo.java b/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/VarBinaryFormatInfoTest.java
similarity index 50%
copy from inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/BinaryFormatInfo.java
copy to inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/VarBinaryFormatInfoTest.java
index b8e577409..d76d012b2 100644
--- a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/BinaryFormatInfo.java
+++ b/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/VarBinaryFormatInfoTest.java
@@ -18,46 +18,36 @@
 
 package org.apache.inlong.sort.formats.common;
 
-import java.nio.charset.StandardCharsets;
-
-public class BinaryFormatInfo implements BasicFormatInfo<byte[]> {
-
-    private static final long serialVersionUID = 8379022656220694709L;
-
-    public static final BinaryFormatInfo INSTANCE = new BinaryFormatInfo();
+import org.junit.Test;
 
-    @Override
-    public String serialize(byte[] record) throws Exception {
-        return new String(record, StandardCharsets.UTF_8);
-    }
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
 
-    @Override
-    public byte[] deserialize(String text) throws Exception {
-        return text.getBytes(StandardCharsets.UTF_8);
-    }
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 
-    @Override
-    public TypeInfo getTypeInfo() {
-        return BinaryTypeInfo.INSTANCE;
-    }
+public class VarBinaryFormatInfoTest extends FormatInfoTestBase {
 
     @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-
-        return o != null && getClass() == o.getClass();
+    Collection<FormatInfo> createFormatInfos() {
+        return Collections.singletonList(VarBinaryFormatInfo.INSTANCE);
     }
 
-    @Override
-    public int hashCode() {
-        return getClass().hashCode();
+    @Test
+    public void testSerialize() throws Exception {
+        assertEquals(
+                "testString",
+                VarBinaryFormatInfo.INSTANCE.serialize("testString".getBytes(StandardCharsets.UTF_8))
+        );
     }
 
-    @Override
-    public String toString() {
-        return "BinaryFormatInfo";
+    @Test
+    public void testDeserialize() throws Exception {
+        assertArrayEquals(
+                "testString".getBytes(StandardCharsets.UTF_8),
+                VarBinaryFormatInfo.INSTANCE.deserialize("testString")
+        );
     }
 
 }