You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/05/11 09:02:11 UTC

[incubator-inlong] branch master updated: [InLong-4066][Sort] Import inlong format to parse InLongMsg (#4148)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 158cf5548 [InLong-4066][Sort] Import inlong format to parse InLongMsg (#4148)
158cf5548 is described below

commit 158cf554847948baa020ddbd0d4ee61ba830db83
Author: thexia <37...@users.noreply.github.com>
AuthorDate: Wed May 11 17:02:07 2022 +0800

    [InLong-4066][Sort] Import inlong format to parse InLongMsg (#4148)
---
 .../sort-formats/format-inlongmsg-base/pom.xml     |  12 ++
 .../formats/inlongmsg/InLongMsgDecodingFormat.java | 178 ++++++++++++++++++++
 .../inlongmsg/InLongMsgDeserializationSchema.java  | 163 +++++++++++++++++++
 .../formats/inlongmsg/InLongMsgFormatFactory.java  |  92 +++++++++++
 .../sort/formats/inlongmsg/InLongMsgOptions.java   |  51 ++++++
 .../sort/formats/inlongmsg/InLongMsgUtils.java     |  33 ++++
 .../org.apache.flink.table.factories.Factory       |  16 ++
 .../inlongmsg/InLongMsgFormatFactoryTest.java      | 124 ++++++++++++++
 .../inlongmsg/InLongMsgRowDataSerDeTest.java       | 179 +++++++++++++++++++++
 .../org.apache.flink.table.factories.Factory       |  16 ++
 .../src/test/resources/log4j-test.properties       |  25 +++
 .../InLongMsgCsvFormatDeserializer.java            |   2 +-
 .../InLongMsgCsvMixedFormatDeserializer.java       |   2 +-
 .../formats/inlongmsgcsv/InLongMsgCsvUtils.java    |  42 -----
 inlong-sort/sort-single-tenant/pom.xml             |  42 +++--
 15 files changed, 917 insertions(+), 60 deletions(-)

diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml b/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
index a36d6bf14..49edb5358 100644
--- a/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
@@ -87,6 +87,18 @@
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-csv</artifactId>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
new file mode 100644
index 000000000..eca346cca
--- /dev/null
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsg;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource.Context;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgDeserializationSchema.MetadataConverter;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class InLongMsgDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> {
+
+    private final String innerFormatMetaPrefix;
+
+    private final DecodingFormat<DeserializationSchema<RowData>> innerDecodingFormat;
+
+    private List<String> metadataKeys;
+
+    private final boolean ignoreErrors;
+
+    public InLongMsgDecodingFormat(
+            DecodingFormat<DeserializationSchema<RowData>> innerDecodingFormat,
+            String innerFormatMetaPrefix,
+            boolean ignoreErrors) {
+        this.innerDecodingFormat = innerDecodingFormat;
+        this.innerFormatMetaPrefix =  innerFormatMetaPrefix;
+        this.metadataKeys = Collections.emptyList();
+        this.ignoreErrors = ignoreErrors;
+    }
+
+    @Override
+    public DeserializationSchema<RowData> createRuntimeDecoder(Context context, DataType physicalDataType) {
+        final MetadataConverter[] metadataConverters = Arrays.stream(ReadableMetadata.values())
+                .filter(metadata -> metadataKeys.contains(metadata.key))
+                .map(metadata -> metadata.converter)
+                .toArray(MetadataConverter[]::new);
+        final List<ReadableMetadata> readableMetadata =
+                metadataKeys.stream()
+                        .map(
+                                k ->
+                                        Stream.of(ReadableMetadata.values())
+                                                .filter(rm -> rm.key.equals(k))
+                                                .findFirst()
+                                                .orElseThrow(IllegalStateException::new))
+                        .collect(Collectors.toList());
+        final List<DataTypes.Field> metadataFields =
+                readableMetadata.stream()
+                        .map(m -> DataTypes.FIELD(m.key, m.dataType))
+                        .collect(Collectors.toList());
+        final DataType producedDataType =
+                DataTypeUtils.appendRowFields(physicalDataType, metadataFields);
+        final TypeInformation<RowData> producedTypeInfo =
+                context.createTypeInformation(producedDataType);
+
+        return new InLongMsgDeserializationSchema(
+                innerDecodingFormat.createRuntimeDecoder(context, physicalDataType),
+                metadataConverters,
+                producedTypeInfo,
+                ignoreErrors);
+    }
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+
+        // add inner format metadata with prefix
+        innerDecodingFormat
+                .listReadableMetadata()
+                .forEach((key, value) -> metadataMap.putIfAbsent(innerFormatMetaPrefix + key, value));
+
+        // add format metadata
+        Stream.of(ReadableMetadata.values())
+                .forEachOrdered(m -> metadataMap.putIfAbsent(m.key, m.dataType));
+
+        return metadataMap;
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys) {
+        // separate inner format and format metadata
+        final List<String> innerFormatMetadataKeys =
+                metadataKeys.stream()
+                        .filter(k -> k.startsWith(innerFormatMetaPrefix))
+                        .collect(Collectors.toList());
+        final List<String> formatMetadataKeys = new ArrayList<>(metadataKeys);
+        formatMetadataKeys.removeAll(innerFormatMetadataKeys);
+        this.metadataKeys = formatMetadataKeys;
+
+        // push down inner format metadata
+        final Map<String, DataType> formatMetadata = innerDecodingFormat.listReadableMetadata();
+        if (formatMetadata.size() > 0) {
+            final List<String> requestedFormatMetadataKeys =
+                    innerFormatMetadataKeys.stream()
+                            .map(k -> k.substring(innerFormatMetaPrefix.length()))
+                            .collect(Collectors.toList());
+            innerDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+        }
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return innerDecodingFormat.getChangelogMode();
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Metadata handling
+    // --------------------------------------------------------------------------------------------
+
+    enum ReadableMetadata {
+        CREATE_TIME(
+                "create-time",
+                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE().notNull(),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(InLongMsgHead head) {
+                        return TimestampData.fromTimestamp(head.getTime());
+                    }
+                }),
+
+        STREAM_ID(
+                "stream-id",
+                DataTypes.STRING().notNull(),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(InLongMsgHead head) {
+                        return StringData.fromString(head.getTid());
+                    }
+                });
+
+        final String key;
+
+        final DataType dataType;
+
+        final MetadataConverter converter;
+
+        ReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
+            this.key = key;
+            this.dataType = dataType;
+            this.converter = converter;
+        }
+    }
+}
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
new file mode 100644
index 000000000..3001bd8ec
--- /dev/null
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsg;
+
+import com.google.common.base.Objects;
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
+import org.apache.inlong.common.msg.InLongMsg;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class InLongMsgDeserializationSchema implements DeserializationSchema<RowData> {
+
+    /** Inner {@link DeserializationSchema} to deserialize {@link InLongMsg} inner packaged
+     *  data buffer message */
+    private final DeserializationSchema<RowData> deserializationSchema;
+
+    /** {@link MetadataConverter} of how to produce metadata from {@link InLongMsg}. */
+    private final MetadataConverter[] metadataConverters;
+
+    /** {@link TypeInformation} of the produced {@link RowData} (physical + meta data). */
+    private final TypeInformation<RowData> producedTypeInfo;
+
+    /**  */
+    private final boolean ignoreErrors;
+
+    public InLongMsgDeserializationSchema(
+            DeserializationSchema<RowData> schema,
+            MetadataConverter[] metadataConverters,
+            TypeInformation<RowData> producedTypeInfo,
+            boolean ignoreErrors) {
+        this.deserializationSchema = schema;
+        this.metadataConverters = metadataConverters;
+        this.producedTypeInfo = producedTypeInfo;
+        this.ignoreErrors = ignoreErrors;
+    }
+
+    @Override
+    public RowData deserialize(byte[] bytes) throws IOException {
+        throw new RuntimeException(
+                "Please invoke DeserializationSchema#deserialize(byte[], Collector<RowData>) instead.");
+    }
+
+    @Override
+    public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
+        InLongMsg inLongMsg = InLongMsg.parseFrom(message);
+
+        for (String attr : inLongMsg.getAttrs()) {
+            InLongMsgHead head;
+            try {
+                head = InLongMsgUtils.parseHead(attr);
+            } catch (Throwable t) {
+                if (ignoreErrors) {
+                    continue;
+                }
+                throw new IOException(
+                        "Failed to deserialize InLongMsg row '" + new String(message) + "'.", t);
+            }
+
+            Iterator<byte[]> iterator = inLongMsg.getIterator(attr);
+            if (iterator == null) {
+                continue;
+            }
+
+            while (iterator.hasNext()) {
+
+                byte[] bodyBytes = iterator.next();
+                long bodyLength = bodyBytes == null ? 0 : bodyBytes.length;
+
+                if (bodyLength == 0) {
+                    continue;
+                }
+
+                List<RowData> list = new ArrayList<>();
+                ListCollector<RowData> collector = new ListCollector<>(list);
+                deserializationSchema.deserialize(bodyBytes, collector);
+                list.stream().forEach(rowdata -> emitRow(head, (GenericRowData) rowdata, out));
+            }
+        }
+
+    }
+
+    @Override
+    public boolean isEndOfStream(RowData rowData) {
+        return false;
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return producedTypeInfo;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof InLongMsgDeserializationSchema)) {
+            return false;
+        }
+        InLongMsgDeserializationSchema that = (InLongMsgDeserializationSchema) o;
+        return ignoreErrors == that.ignoreErrors
+                && Objects.equal(Arrays.stream(metadataConverters).collect(Collectors.toList()),
+                        Arrays.stream(that.metadataConverters).collect(Collectors.toList()))
+                && Objects.equal(deserializationSchema, that.deserializationSchema)
+                && Objects.equal(producedTypeInfo, that.producedTypeInfo);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(deserializationSchema, metadataConverters, producedTypeInfo, ignoreErrors);
+    }
+
+    interface MetadataConverter extends Serializable {
+        Object read(InLongMsgHead head);
+    }
+
+    /** add metadata column */
+    private void emitRow(InLongMsgHead head, GenericRowData physicalRow, Collector<RowData> out) {
+        if (metadataConverters.length == 0) {
+            out.collect(physicalRow);
+            return;
+        }
+        final int physicalArity = physicalRow.getArity();
+        final int metadataArity = metadataConverters.length;
+        final GenericRowData producedRow =
+                new GenericRowData(physicalRow.getRowKind(), physicalArity + metadataArity);
+        for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {
+            producedRow.setField(physicalPos, physicalRow.getField(physicalPos));
+        }
+        for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
+            producedRow.setField(
+                    physicalArity + metadataPos, metadataConverters[metadataPos].read(head));
+        }
+        out.collect(producedRow);
+    }
+}
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java
new file mode 100644
index 000000000..bdc3d3059
--- /dev/null
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsg;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.IGNORE_PARSE_ERRORS;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.INNER_FORMAT;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.validateDecodingFormatOptions;
+
+public final class InLongMsgFormatFactory
+        implements DeserializationFormatFactory, SerializationFormatFactory {
+
+    public static final String IDENTIFIER = "inlong";
+
+    public static final String INLONG_PREFIX = "inlong.";
+
+    @Override
+    public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(Context context,
+            ReadableConfig formatOptions) {
+        validateDecodingFormatOptions(formatOptions);
+
+        final DeserializationFormatFactory innerFactory = FactoryUtil.discoverFactory(
+                context.getClassLoader(),
+                DeserializationFormatFactory.class,
+                formatOptions.get(INNER_FORMAT));
+        Configuration allOptions = Configuration.fromMap(context.getCatalogTable().getOptions());
+        String innerFormatMetaPrefix = formatOptions.get(INNER_FORMAT) + ".";
+        String innerFormatPrefix = INLONG_PREFIX + innerFormatMetaPrefix;
+        DecodingFormat<DeserializationSchema<RowData>> innerFormat =
+                innerFactory.createDecodingFormat(context, new DelegatingConfiguration(allOptions, innerFormatPrefix));
+        boolean ignoreErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
+
+        return new InLongMsgDecodingFormat(innerFormat, innerFormatMetaPrefix, ignoreErrors);
+    }
+
+    @Override
+    public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(Context context,
+            ReadableConfig formatOptions) {
+        throw new RuntimeException("Do not support inlong format serialize.");
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(INNER_FORMAT);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(IGNORE_PARSE_ERRORS);
+        return options;
+    }
+}
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java
new file mode 100644
index 000000000..61320fbe3
--- /dev/null
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsg;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+
+public class InLongMsgOptions {
+    private InLongMsgOptions() {
+    }
+
+    public static final ConfigOption<String> INNER_FORMAT =
+            ConfigOptions.key("inner.format")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Defines the format identifier for encoding attr data. \n"
+                            + "The identifier is used to discover a suitable format factory.");
+
+    public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS =
+            ConfigOptions.key("ignore-parse-errors")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Optional flag to skip fields and rows with parse errors instead of failing;\n"
+                                        + "fields are set to null in case of errors");
+
+    public static void validateDecodingFormatOptions(ReadableConfig config) {
+        String innerFormat = config.get(INNER_FORMAT);
+        if (innerFormat == null) {
+            throw new ValidationException(
+                    INNER_FORMAT.key()  + " shouldn't be null.");
+        }
+    }
+}
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
index e3569a4e9..4236d5dde 100644
--- a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
@@ -108,6 +108,39 @@ public class InLongMsgUtils {
         }
     }
 
+    public static InLongMsgHead parseHead(String attr) {
+        Map<String, String> attributes = parseAttr(attr);
+
+        // Extracts interface from the attributes.
+        String streamId;
+
+        if (attributes.containsKey(INLONGMSG_ATTR_STREAM_ID)) {
+            streamId = attributes.get(INLONGMSG_ATTR_STREAM_ID);
+        } else {
+            throw new IllegalArgumentException("Could not find " + INLONGMSG_ATTR_STREAM_ID + " in attributes!");
+        }
+
+        // Extracts time from the attributes
+        Timestamp time;
+
+        if (attributes.containsKey(INLONGMSG_ATTR_TIME_T)) {
+            String date = attributes.get(INLONGMSG_ATTR_TIME_T).trim();
+            time = parseDateTime(date);
+        } else if (attributes.containsKey(INLONGMSG_ATTR_TIME_DT)) {
+            String epoch = attributes.get(INLONGMSG_ATTR_TIME_DT).trim();
+            time = parseEpochTime(epoch);
+        } else {
+            throw new IllegalArgumentException(
+                    "Could not find " + INLONGMSG_ATTR_TIME_T
+                            + " or " + INLONGMSG_ATTR_TIME_DT + " in attributes!");
+        }
+
+        // Extracts predefined fields from the attributes
+        List<String> predefinedFields = getPredefinedFields(attributes);
+
+        return new InLongMsgHead(attributes, streamId, time, predefinedFields);
+    }
+
     public static Map<String, String> parseAttr(String attr) {
         return StringUtils.splitKv(
                 attr,
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..d2c119926
--- /dev/null
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.formats.inlongmsg.InLongMsgFormatFactory
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactoryTest.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactoryTest.java
new file mode 100644
index 000000000..e48cf774d
--- /dev/null
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactoryTest.java
@@ -0,0 +1,124 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsg;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.csv.CsvRowDataDeserializationSchema;
+import org.apache.flink.formats.csv.CsvRowSchemaConverter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.TestDynamicTableFactory;
+import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgDeserializationSchema.MetadataConverter;
+import org.junit.Test;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.apache.flink.table.factories.utils.FactoryMocks.PHYSICAL_TYPE;
+import static org.apache.flink.table.factories.utils.FactoryMocks.SCHEMA;
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static org.junit.Assert.assertEquals;
+
+public class InLongMsgFormatFactoryTest {
+
+    @Test
+    public void testUserDefinedOptions()
+            throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
+        final Map<String, String> tableOptions =
+                getModifiedOptions(opts -> {
+                    opts.put("inlong.inner.format", "csv");
+                    opts.put("inlong.ignore-parse-errors", "true");
+                });
+
+        // test Deser
+        Constructor[] constructors = CsvRowDataDeserializationSchema.class.getDeclaredConstructors();
+        Constructor constructor = CsvRowDataDeserializationSchema.class.getDeclaredConstructor(
+                RowType.class, TypeInformation.class, CsvSchema.class, boolean.class);
+        constructor.setAccessible(true);
+        DeserializationSchema<RowData> deserializationSchema = (DeserializationSchema<RowData>) constructor.newInstance(
+                PHYSICAL_TYPE, InternalTypeInfo.of(PHYSICAL_TYPE),
+                CsvRowSchemaConverter.convert(PHYSICAL_TYPE), false);
+
+        InLongMsgDeserializationSchema expectedDeser = new InLongMsgDeserializationSchema(
+                deserializationSchema, new MetadataConverter[0], InternalTypeInfo.of(PHYSICAL_TYPE), true);
+        DeserializationSchema<RowData> actualDeser = createDeserializationSchema(tableOptions, SCHEMA);
+        assertEquals(expectedDeser, actualDeser);
+    }
+
+    // ------------------------------------------------------------------------
+    //  Public Tools
+    // ------------------------------------------------------------------------
+    public static DeserializationSchema<RowData> createDeserializationSchema(
+            Map<String, String> options, ResolvedSchema schema) {
+        DynamicTableSource source = createTableSource(schema, options);
+
+        assert source instanceof TestDynamicTableFactory.DynamicTableSourceMock;
+        TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
+                (TestDynamicTableFactory.DynamicTableSourceMock) source;
+
+        return scanSourceMock.valueFormat.createRuntimeDecoder(
+                ScanRuntimeProviderContext.INSTANCE, schema.toPhysicalRowDataType());
+    }
+
+    public static SerializationSchema<RowData> createSerializationSchema(
+            Map<String, String> options, ResolvedSchema schema) {
+        DynamicTableSink sink = createTableSink(schema, options);
+
+        assert sink instanceof TestDynamicTableFactory.DynamicTableSinkMock;
+        TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
+                (TestDynamicTableFactory.DynamicTableSinkMock) sink;
+
+        return sinkMock.valueFormat.createRuntimeEncoder(
+                new SinkRuntimeProviderContext(false), schema.toPhysicalRowDataType());
+    }
+
+    /**
+     * Returns the full options modified by the given consumer {@code optionModifier}.
+     *
+     * @param optionModifier Consumer to modify the options
+     */
+    public static Map<String, String> getModifiedOptions(Consumer<Map<String, String>> optionModifier) {
+        Map<String, String> options = getAllOptions();
+        optionModifier.accept(options);
+        return options;
+    }
+
+    private static Map<String, String> getAllOptions() {
+        final Map<String, String> options = new HashMap<>();
+        options.put("connector", TestDynamicTableFactory.IDENTIFIER);
+        options.put("target", "MyTarget");
+        options.put("buffer-size", "1000");
+        options.put("format", "inlong");
+        return options;
+    }
+
+}
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgRowDataSerDeTest.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgRowDataSerDeTest.java
new file mode 100644
index 000000000..2d4c89737
--- /dev/null
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgRowDataSerDeTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsg;
+
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.factories.utils.FactoryMocks;
+import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.inlong.common.msg.InLongMsg;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertEquals;
+
+public class InLongMsgRowDataSerDeTest {
+
+    @Test
+    public void testDeserializeInLongMsg() throws Exception {
+        // mock data
+        InLongMsg inLongMsg1 = InLongMsg.newInLongMsg();
+        inLongMsg1.addMsg("streamId=HAHA&t=202201011112",
+                "1,asdqw".getBytes(StandardCharsets.UTF_8));
+        inLongMsg1.addMsg("streamId=xixi&t=202201011112",
+                "2,testData".getBytes(StandardCharsets.UTF_8));
+        InLongMsg inLongMsg2 = InLongMsg.newInLongMsg();
+        inLongMsg1.addMsg("streamId=oooo&t=202201011112",
+                "3,dwqdqw".getBytes(StandardCharsets.UTF_8));
+        inLongMsg1.addMsg("streamId=bubub&t=202201011112",
+                "4,asdqdqwe".getBytes(StandardCharsets.UTF_8));
+        List<InLongMsg> inLongMsgs = Stream.of(inLongMsg1).collect(Collectors.toList());
+        List<byte[]> input = inLongMsgs.stream()
+                .map(inLongMsg -> inLongMsg.buildArray())
+                .collect(Collectors.toList());
+        final List<RowData> exceptedOutput = Stream.of(
+                GenericRowData.of(1L, BinaryStringData.fromString("asdqw")),
+                GenericRowData.of(2L, BinaryStringData.fromString("testData")),
+                GenericRowData.of(3L, BinaryStringData.fromString("dwqdqw")),
+                GenericRowData.of(4L, BinaryStringData.fromString("asdqdqwe"))
+        ).collect(Collectors.toList());
+
+        // deserialize
+        final Map<String, String> tableOptions =
+                InLongMsgFormatFactoryTest.getModifiedOptions(opts -> {
+                    opts.put("inlong.inner.format", "csv");
+                });
+        ResolvedSchema schema = ResolvedSchema.of(
+                Column.physical("id", DataTypes.BIGINT()),
+                Column.physical("name", DataTypes.STRING())
+        );
+        DeserializationSchema<RowData> inLongMsgDeserializationSchema =
+                InLongMsgFormatFactoryTest.createDeserializationSchema(tableOptions, schema);
+        List<RowData> deData = new ArrayList<>();
+        ListCollector<RowData> out = new ListCollector<>(deData);
+        for (byte[] bytes : input) {
+            inLongMsgDeserializationSchema.deserialize(bytes, out);
+        }
+
+        assertEquals(exceptedOutput, deData);
+    }
+
+    @Test
+    public void testDeserializeInLongMsgWithError() throws Exception {
+        // mock data
+        InLongMsg inLongMsg1 = InLongMsg.newInLongMsg();
+        inLongMsg1.addMsg("asdq",
+                "1, asd".getBytes(StandardCharsets.UTF_8));
+        List<InLongMsg> inLongMsgs = Stream.of(inLongMsg1).collect(Collectors.toList());
+        List<byte[]> input = inLongMsgs.stream()
+                .map(inLongMsg -> inLongMsg.buildArray())
+                .collect(Collectors.toList());
+
+        // deserialize
+        final Map<String, String> tableOptions =
+                InLongMsgFormatFactoryTest.getModifiedOptions(opts -> {
+                    opts.put("inlong.inner.format", "csv");
+                    opts.put("inlong.ignore-parse-errors", "true");
+                });
+        ResolvedSchema schema = ResolvedSchema.of(
+                Column.physical("id", DataTypes.BIGINT()),
+                Column.physical("name", DataTypes.STRING())
+        );
+        DeserializationSchema<RowData> inLongMsgDeserializationSchema =
+                InLongMsgFormatFactoryTest.createDeserializationSchema(tableOptions, schema);
+        List<RowData> deData = new ArrayList<>();
+        ListCollector<RowData> out = new ListCollector<>(deData);
+        for (byte[] bytes : input) {
+            inLongMsgDeserializationSchema.deserialize(bytes, out);
+        }
+        assertEquals(Collections.emptyList(), deData);
+    }
+
+    @Test
+    public void testDeserializeInLongMsgWithMetadata() throws Exception {
+        InLongMsg inLongMsg1 = InLongMsg.newInLongMsg();
+        inLongMsg1.addMsg("streamId=HAHA&dt=1652153467000",
+                "1,asdqw".getBytes(StandardCharsets.UTF_8));
+        inLongMsg1.addMsg("streamId=xixi&dt=1652153467000",
+                "2,testData".getBytes(StandardCharsets.UTF_8));
+        InLongMsg inLongMsg2 = InLongMsg.newInLongMsg();
+        inLongMsg1.addMsg("streamId=oooo&dt=1652153468000",
+                "3,dwqdqw".getBytes(StandardCharsets.UTF_8));
+        inLongMsg1.addMsg("streamId=bubub&dt=1652153469000",
+                "4,asdqdqwe".getBytes(StandardCharsets.UTF_8));
+        List<InLongMsg> inLongMsgs = Stream.of(inLongMsg1).collect(Collectors.toList());
+        List<byte[]> input = inLongMsgs.stream()
+                .map(inLongMsg -> inLongMsg.buildArray())
+                .collect(Collectors.toList());
+        final List<RowData> exceptedOutput = Stream.of(
+                GenericRowData.of(1L, BinaryStringData.fromString("asdqw"),
+                        TimestampData.fromTimestamp(new Timestamp(1652153467000L))),
+                GenericRowData.of(2L, BinaryStringData.fromString("testData"),
+                        TimestampData.fromTimestamp(new Timestamp(1652153467000L))),
+                GenericRowData.of(3L, BinaryStringData.fromString("dwqdqw"),
+                        TimestampData.fromTimestamp(new Timestamp(1652153468000L))),
+                GenericRowData.of(4L, BinaryStringData.fromString("asdqdqwe"),
+                        TimestampData.fromTimestamp(new Timestamp(1652153469000L)))
+        ).collect(Collectors.toList());
+
+        // deserialize
+        final Map<String, String> tableOptions = new HashMap<>();
+        tableOptions.put("inner.format", "csv");
+        ResolvedSchema schema = ResolvedSchema.of(
+                Column.physical("id", DataTypes.BIGINT()),
+                Column.physical("name", DataTypes.STRING()),
+                Column.metadata("time", DataTypes.TIMESTAMP(3), "create-time", false)
+
+        );
+
+        // apply metadata
+        InLongMsgFormatFactory factory = new InLongMsgFormatFactory();
+        DecodingFormat<DeserializationSchema<RowData>> decodingFormat = factory.createDecodingFormat(FactoryMocks
+                        .createTableContext(schema, tableOptions), Configuration.fromMap(tableOptions));
+        decodingFormat.applyReadableMetadata(Stream.of("create-time").collect(Collectors.toList()));
+        DeserializationSchema<RowData> inLongMsgDeserializationSchema = decodingFormat
+                .createRuntimeDecoder(ScanRuntimeProviderContext.INSTANCE, schema.toPhysicalRowDataType());
+
+        List<RowData> deData = new ArrayList<>();
+        ListCollector<RowData> out = new ListCollector<>(deData);
+        for (byte[] bytes : input) {
+            inLongMsgDeserializationSchema.deserialize(bytes, out);
+        }
+
+        assertEquals(exceptedOutput, deData);
+    }
+}
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-formats/format-inlongmsg-base/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..dd4af0fb6
--- /dev/null
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.formats.csv.CsvFormatFactory
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/test/resources/log4j-test.properties b/inlong-sort/sort-formats/format-inlongmsg-base/src/test/resources/log4j-test.properties
new file mode 100644
index 000000000..bf052fa68
--- /dev/null
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/test/resources/log4j-test.properties
@@ -0,0 +1,25 @@
+#
+# /*
+#  * Licensed to the Apache Software Foundation (ASF) under one
+#  * or more contributor license agreements.  See the NOTICE file
+#  * distributed with this work for additional information
+#  * regarding copyright ownership.  The ASF licenses this file
+#  * to you under the Apache License, Version 2.0 (the
+#  * "License"); you may not use this file except in compliance
+#  * with the License.  You may obtain a copy of the License at
+#  *
+#  *     http://www.apache.org/licenses/LICENSE-2.0
+#  *
+#  * Unless required by applicable law or agreed to in writing, software
+#  * distributed under the License is distributed on an "AS IS" BASIS,
+#  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  * See the License for the specific language governing permissions and
+#  * limitations under the License.
+#  */
+#
+log4j.rootLogger=OFF, A1
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
index ca2bab06a..9256b018b 100644
--- a/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
@@ -142,7 +142,7 @@ public final class InLongMsgCsvFormatDeserializer extends AbstractInLongMsgForma
 
     @Override
     protected InLongMsgHead parseHead(String attr) {
-        return InLongMsgCsvUtils.parseHead(attr);
+        return InLongMsgUtils.parseHead(attr);
     }
 
     @Override
diff --git a/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java
index 3d2042264..45a4be324 100644
--- a/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java
@@ -100,7 +100,7 @@ public final class InLongMsgCsvMixedFormatDeserializer extends AbstractInLongMsg
 
     @Override
     protected InLongMsgHead parseHead(String attr) {
-        return InLongMsgCsvUtils.parseHead(attr);
+        return InLongMsgUtils.parseHead(attr);
     }
 
     @Override
diff --git a/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
index 84c32bb9a..befc1c702 100644
--- a/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
@@ -18,14 +18,6 @@
 
 package org.apache.inlong.sort.formats.inlongmsgcsv;
 
-import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID;
-import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
-import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
-import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields;
-import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseAttr;
-import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseDateTime;
-import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseEpochTime;
-
 import java.nio.charset.Charset;
 import java.sql.Timestamp;
 import java.util.Arrays;
@@ -37,7 +29,6 @@ import org.apache.inlong.sort.formats.base.TableFormatUtils;
 import org.apache.inlong.sort.formats.common.FormatInfo;
 import org.apache.inlong.sort.formats.common.RowFormatInfo;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
 import org.apache.inlong.sort.formats.util.StringUtils;
 import org.slf4j.Logger;
@@ -53,39 +44,6 @@ public class InLongMsgCsvUtils {
     public static final String FORMAT_DELETE_HEAD_DELIMITER = "format.delete-head-delimiter";
     public static final boolean DEFAULT_DELETE_HEAD_DELIMITER = true;
 
-    public static InLongMsgHead parseHead(String attr) {
-        Map<String, String> attributes = parseAttr(attr);
-
-        // Extracts interface from the attributes.
-        String streamId;
-
-        if (attributes.containsKey(INLONGMSG_ATTR_STREAM_ID)) {
-            streamId = attributes.get(INLONGMSG_ATTR_STREAM_ID);
-        } else {
-            throw new IllegalArgumentException("Could not find " + INLONGMSG_ATTR_STREAM_ID + " in attributes!");
-        }
-
-        // Extracts time from the attributes
-        Timestamp time;
-
-        if (attributes.containsKey(INLONGMSG_ATTR_TIME_T)) {
-            String date = attributes.get(INLONGMSG_ATTR_TIME_T).trim();
-            time = parseDateTime(date);
-        } else if (attributes.containsKey(INLONGMSG_ATTR_TIME_DT)) {
-            String epoch = attributes.get(INLONGMSG_ATTR_TIME_DT).trim();
-            time = parseEpochTime(epoch);
-        } else {
-            throw new IllegalArgumentException(
-                    "Could not find " + INLONGMSG_ATTR_TIME_T
-                            + " or " + INLONGMSG_ATTR_TIME_DT + " in attributes!");
-        }
-
-        // Extracts predefined fields from the attributes
-        List<String> predefinedFields = getPredefinedFields(attributes);
-
-        return new InLongMsgHead(attributes, streamId, time, predefinedFields);
-    }
-
     public static InLongMsgBody parseBody(
             byte[] bytes,
             String charset,
diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index 1318a6990..d9a7a4af7 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -64,16 +64,6 @@
                 </exclusion>
             </exclusions>
         </dependency>
-        <dependency>
-            <groupId>org.apache.inlong</groupId>
-            <artifactId>sort-format-base</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.inlong</groupId>
-            <artifactId>sort-format-json</artifactId>
-            <version>${project.version}</version>
-        </dependency>
         <dependency>
             <groupId>org.apache.inlong</groupId>
             <artifactId>audit-sdk</artifactId>
@@ -122,12 +112,8 @@
             <version>1.4.01</version>
         </dependency>
 
-        <!--flink-->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-connector-hive_${flink.scala.binary.version}</artifactId>
-        </dependency>
 
+        <!-- flink format -->
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-json</artifactId>
@@ -143,6 +129,30 @@
             <artifactId>flink-avro</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-format-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-format-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-format-inlongmsg-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <!-- flink connector -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-hive_${flink.scala.binary.version}</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.hive</groupId>
             <artifactId>hive-exec</artifactId>
@@ -158,7 +168,7 @@
             </exclusions>
         </dependency>
 
-
+        <!-- flink core -->
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId>