You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/01/23 03:36:54 UTC

[incubator-inlong] branch master updated: [INLONG-2271] move TDMsg to InLongMsg (#2273)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 912a30a  [INLONG-2271] move TDMsg to InLongMsg (#2273)
912a30a is described below

commit 912a30ab7c85eab61d09849e8c5f4c9345e4d876
Author: dockerzhang <do...@apache.org>
AuthorDate: Sun Jan 23 11:36:47 2022 +0800

    [INLONG-2271] move TDMsg to InLongMsg (#2273)
    
    Co-authored-by: dockerzhang(张超) <do...@tencent.com>
---
 LICENSE                                            |   2 +-
 docker/kubernetes/templates/NOTES.txt              |  78 +--
 inlong-audit/audit-store/pom.xml                   |   9 +-
 inlong-audit/pom.xml                               |   8 +-
 .../commons/msg/{TDMsg1.java => InLongMsg.java}    |  48 +-
 ...gAttrBuilder.java => InLongMsgAttrBuilder.java} |  60 +--
 .../dataproxy/http/SimpleMessageHandler.java       |  12 +-
 .../dataproxy/source/ServerMessageHandler.java     |  22 +-
 .../dataproxy/source/SimpleMessageHandler.java     |  22 +-
 .../thirdpart/sort/PushHiveConfigTaskListener.java |   6 +-
 .../deserialization/DeserializationInfo.java       |  10 +-
 ....java => InLongMsgCsv2DeserializationInfo.java} |   6 +-
 ...o.java => InLongMsgCsvDeserializationInfo.java} |   8 +-
 ...Info.java => InLongMsgDeserializationInfo.java} |   6 +-
 ...fo.java => InLongMsgKvDeserializationInfo.java} |   6 +-
 ...va => InLongMsgTlogCsvDeserializationInfo.java} |   6 +-
 ...ava => InLongMsgTlogKvDeserializationInfo.java} |   6 +-
 .../sort/protocol/DeserializationInfoTest.java     |   4 +-
 .../sort/protocol/source/PulsarSourceInfoTest.java |   6 +-
 inlong-sort/sort-core/pom.xml                      |   4 +-
 ...rd.java => InLongMsgMixedSerializedRecord.java} |   8 +-
 .../deserialization/DeserializationSchema.java     |  19 +-
 ...eserializer.java => InLongMsgDeserializer.java} |   8 +-
 ...alizer.java => InLongMsgMixedDeserializer.java} |  35 +-
 .../deserialization/MultiTenancyDeserializer.java  |  21 +-
 ...=> MultiTenancyInLongMsgMixedDeserializer.java} |  83 ++--
 .../flink/tubemq/MultiTenancyTubeConsumer.java     |   4 +-
 .../flink/tubemq/TubeSubscriptionDescription.java  |   8 +-
 .../org/apache/inlong/sort/util/CommonUtils.java   |   6 +-
 ...st.java => InLongMsgMixedDeserializerTest.java} |  38 +-
 ...ultiTenancyInLongMsgMixedDeserializerTest.java} |  36 +-
 .../tubemq/MultiTopicTubeSourceFunctionTest.java   |   4 +-
 .../tubemq/TubeSubscriptionDescriptionTest.java    |   8 +-
 .../pom.xml                                        |   4 +-
 .../AbstractInLongMsgFormatDeserializer.java}      |  38 +-
 .../AbstractInLongMsgMixedFormatDeserializer.java} |  10 +-
 .../sort/formats/inlongmsg/InLongMsgBody.java}     |  16 +-
 .../sort/formats/inlongmsg/InLongMsgHead.java}     |  15 +-
 .../inlongmsg/InLongMsgMixedFormatConverter.java}  |   6 +-
 .../inlongmsg/InLongMsgMixedFormatFactory.java}    |  12 +-
 .../inlongmsg/InLongMsgMixedValidator.java}        |   6 +-
 .../sort/formats/inlongmsg/InLongMsgUtils.java}    |  38 +-
 .../formats/inlongmsg/InLongMsgValidator.java}     |   6 +-
 .../pom.xml                                        |   6 +-
 .../sort/formats/inlongmsgcsv/InLongMsgCsv.java}   |  36 +-
 .../InLongMsgCsvFormatDeserializer.java}           |  40 +-
 .../inlongmsgcsv/InLongMsgCsvFormatFactory.java}   |  56 +--
 .../InLongMsgCsvMixedFormatConverter.java}         |  28 +-
 .../InLongMsgCsvMixedFormatDeserializer.java}      |  36 +-
 .../formats/inlongmsgcsv/InLongMsgCsvUtils.java}   |  54 +--
 .../org.apache.flink.table.factories.TableFactory  |   2 +-
 .../InLongMsgCsvFormatDeserializerTest.java}       | 152 +++---
 .../InLongMsgCsvFormatFactoryTest.java}            |  24 +-
 .../formats/inlongmsgcsv/InLongMsgCsvTest.java}    |  18 +-
 .../src/test/resources/log4j-test.properties       |   0
 inlong-sort/sort-formats/pom.xml                   |   4 +-
 .../example/consumer/test_consumer.cc              | 295 ++++++------
 .../example/consumer/test_multi_thread_filter.cc   | 533 +++++++++++----------
 .../example/consumer/test_multithread_pull.cc      | 355 +++++++-------
 licenses/inlong-dataproxy/LICENSE-binary           |   2 +-
 .../LICENSE-google-protobuf-java-format.txt        |  54 +--
 licenses/inlong-tubemq/LICENSE-binary              |   2 +-
 .../LICENSE-google-protobuf-java-format.txt        |  54 +--
 63 files changed, 1256 insertions(+), 1253 deletions(-)

diff --git a/LICENSE b/LICENSE
index 0bcceda..93c347b 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1,4 +1,4 @@
-
+
                                  Apache License
                            Version 2.0, January 2004
                         http://www.apache.org/licenses/
diff --git a/docker/kubernetes/templates/NOTES.txt b/docker/kubernetes/templates/NOTES.txt
index 7633784..37b4f95 100644
--- a/docker/kubernetes/templates/NOTES.txt
+++ b/docker/kubernetes/templates/NOTES.txt
@@ -1,39 +1,39 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-1. Get the application URL by running these commands:
-{{/*{{- if .Values.ingress.enabled }}*/}}
-{{/*{{- range $host := .Values.ingress.hosts }}*/}}
-{{/*  {{- range .paths }}*/}}
-{{/*  http{{ if $.Values.ingress.tls }}s{{ end }}://{{ $host.host }}{{ .path }}*/}}
-{{/*  {{- end }}*/}}
-{{/*{{- end }}*/}}
-{{/*{{- else if contains "NodePort" .Values.service.type }}*/}}
-{{/*  export NODE_PORT=$(kubectl get --namespace {{ .Release.Namespace }} -o jsonpath="{.spec.ports[0].nodePort}" services {{ include "inlong.fullname" . }})*/}}
-{{/*  export NODE_IP=$(kubectl get nodes --namespace {{ .Release.Namespace }} -o jsonpath="{.items[0].status.addresses[0].address}")*/}}
-{{/*  echo http://$NODE_IP:$NODE_PORT*/}}
-{{/*{{- else if contains "LoadBalancer" .Values.service.type }}*/}}
-{{/*     NOTE: It may take a few minutes for the LoadBalancer IP to be available.*/}}
-{{/*           You can watch the status of by running 'kubectl get --namespace {{ .Release.Namespace }} svc -w {{ include "inlong.fullname" . }}'*/}}
-{{/*  export SERVICE_IP=$(kubectl get svc --namespace {{ .Release.Namespace }} {{ include "inlong.fullname" . }} --template "{{"{{ range (index .status.loadBalancer.ingress 0) }}{{.}}{{ end }}"}}")*/}}
-{{/*  echo http://$SERVICE_IP:{{ .Values.service.port }}*/}}
-{{/*{{- else if contains "ClusterIP" .Values.service.type }}*/}}
-{{/*  export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "inlong.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}")*/}}
-{{/*  export CONTAINER_PORT=$(kubectl get pod --namespace {{ .Release.Namespace }} $POD_NAME -o jsonpath="{.spec.containers[0].ports[0].containerPort}")*/}}
-{{/*  echo "Visit http://127.0.0.1:8080 to use your application"*/}}
-{{/*  kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8080:$CONTAINER_PORT*/}}
-{{/*{{- end }}*/}}
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+1. Get the application URL by running these commands:
+{{/*{{- if .Values.ingress.enabled }}*/}}
+{{/*{{- range $host := .Values.ingress.hosts }}*/}}
+{{/*  {{- range .paths }}*/}}
+{{/*  http{{ if $.Values.ingress.tls }}s{{ end }}://{{ $host.host }}{{ .path }}*/}}
+{{/*  {{- end }}*/}}
+{{/*{{- end }}*/}}
+{{/*{{- else if contains "NodePort" .Values.service.type }}*/}}
+{{/*  export NODE_PORT=$(kubectl get --namespace {{ .Release.Namespace }} -o jsonpath="{.spec.ports[0].nodePort}" services {{ include "inlong.fullname" . }})*/}}
+{{/*  export NODE_IP=$(kubectl get nodes --namespace {{ .Release.Namespace }} -o jsonpath="{.items[0].status.addresses[0].address}")*/}}
+{{/*  echo http://$NODE_IP:$NODE_PORT*/}}
+{{/*{{- else if contains "LoadBalancer" .Values.service.type }}*/}}
+{{/*     NOTE: It may take a few minutes for the LoadBalancer IP to be available.*/}}
+{{/*           You can watch the status of by running 'kubectl get --namespace {{ .Release.Namespace }} svc -w {{ include "inlong.fullname" . }}'*/}}
+{{/*  export SERVICE_IP=$(kubectl get svc --namespace {{ .Release.Namespace }} {{ include "inlong.fullname" . }} --template "{{"{{ range (index .status.loadBalancer.ingress 0) }}{{.}}{{ end }}"}}")*/}}
+{{/*  echo http://$SERVICE_IP:{{ .Values.service.port }}*/}}
+{{/*{{- else if contains "ClusterIP" .Values.service.type }}*/}}
+{{/*  export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "inlong.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}")*/}}
+{{/*  export CONTAINER_PORT=$(kubectl get pod --namespace {{ .Release.Namespace }} $POD_NAME -o jsonpath="{.spec.containers[0].ports[0].containerPort}")*/}}
+{{/*  echo "Visit http://127.0.0.1:8080 to use your application"*/}}
+{{/*  kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8080:$CONTAINER_PORT*/}}
+{{/*{{- end }}*/}}
diff --git a/inlong-audit/audit-store/pom.xml b/inlong-audit/audit-store/pom.xml
index 3ae6690..965a0c4 100644
--- a/inlong-audit/audit-store/pom.xml
+++ b/inlong-audit/audit-store/pom.xml
@@ -32,10 +32,6 @@
 
     <dependencies>
         <dependency>
-            <groupId>com.google.code.gson</groupId>
-            <artifactId>gson</artifactId>
-        </dependency>
-        <dependency>
             <groupId>org.apache.inlong</groupId>
             <artifactId>audit-common</artifactId>
             <version>${project.version}</version>
@@ -291,6 +287,7 @@
                 <plugin>
                     <groupId>org.springframework.boot</groupId>
                     <artifactId>spring-boot-maven-plugin</artifactId>
+                    <version>${spring.plugin.version}</version>
                     <executions>
                         <execution>
                             <id>repackage</id>
@@ -324,7 +321,7 @@
                         <dependency>
                             <groupId>org.springframework.boot</groupId>
                             <artifactId>spring-boot-maven-plugin</artifactId>
-                            <version>2.3.3.RELEASE</version>
+                            <version>${spring.plugin.version}</version>
                         </dependency>
                     </dependencies>
                     <executions>
@@ -371,10 +368,10 @@
             <plugin>
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot-maven-plugin</artifactId>
+                <version>${spring.plugin.version}</version>
                 <configuration>
                     <layout>ZIP</layout>
                     <includes>
-                        <!-- 打包时,本jar包不包含其他依赖包 , 否则打出的jar包还是很大 -->
                         <include>
                             <groupId>nothing</groupId>
                             <artifactId>nothing</artifactId>
diff --git a/inlong-audit/pom.xml b/inlong-audit/pom.xml
index 7c5e6e5..52bb50e 100644
--- a/inlong-audit/pom.xml
+++ b/inlong-audit/pom.xml
@@ -63,16 +63,12 @@
         <gson.version>2.8.6</gson.version>
         <jackson.version>2.12.3</jackson.version>
         <junit.version>4.12</junit.version>
-        <autoconfigure.version>2.4.3</autoconfigure.version>   
+        <autoconfigure.version>2.4.3</autoconfigure.version>
+        <spring.plugin.version>2.3.3.RELEASE</spring.plugin.version>
     </properties>
     <dependencyManagement>
         <dependencies>
             <dependency>
-                <groupId>org.apache.pulsar</groupId>
-                <artifactId>pulsar-client</artifactId>
-                <version>${pulsar.version}</version>
-            </dependency>
-            <dependency>
                 <groupId>com.google.protobuf</groupId>
                 <artifactId>protobuf-java</artifactId>
                 <version>${protobuf-version}</version>
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsg1.java b/inlong-common/src/main/java/org/apache/inlong/commons/msg/InLongMsg.java
similarity index 96%
rename from inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsg1.java
rename to inlong-common/src/main/java/org/apache/inlong/commons/msg/InLongMsg.java
index be871ed..2dcb62b 100644
--- a/inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsg1.java
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/msg/InLongMsg.java
@@ -30,7 +30,7 @@ import java.util.Set;
 
 import org.xerial.snappy.Snappy;
 
-public class TDMsg1 {
+public class InLongMsg {
     private static final int DEFAULT_CAPACITY = 4096;
     private final int capacity;
 
@@ -143,28 +143,28 @@ public class TDMsg1 {
      *
      * @return
      */
-    public static TDMsg1 newTDMsg() {
-        return newTDMsg(true);
+    public static InLongMsg newInLongMsg() {
+        return newInLongMsg(true);
     }
 
     /**
      * capacity: 4096, version: 1
      *
      * @param compress if copress
-     * @return TDMsg1
+     * @return InLongMsg
      */
-    public static TDMsg1 newTDMsg(boolean compress) {
-        return newTDMsg(DEFAULT_CAPACITY, compress);
+    public static InLongMsg newInLongMsg(boolean compress) {
+        return newInLongMsg(DEFAULT_CAPACITY, compress);
     }
 
     /**
      * capacity: 4096, compress: true
      *
      * @param v version info
-     * @return TDMsg1
+     * @return InLongMsg
      */
-    public static TDMsg1 newTDMsg(int v) {
-        return newTDMsg(DEFAULT_CAPACITY, true, v);
+    public static InLongMsg newInLongMsg(int v) {
+        return newInLongMsg(DEFAULT_CAPACITY, true, v);
     }
 
     /**
@@ -172,10 +172,10 @@ public class TDMsg1 {
      *
      * @param compress if compress
      * @param v        version
-     * @return TDMsg1
+     * @return InLongMsg
      */
-    public static TDMsg1 newTDMsg(boolean compress, int v) {
-        return newTDMsg(DEFAULT_CAPACITY, compress, v);
+    public static InLongMsg newInLongMsg(boolean compress, int v) {
+        return newInLongMsg(DEFAULT_CAPACITY, compress, v);
     }
 
     /**
@@ -183,25 +183,25 @@ public class TDMsg1 {
      *
      * @param capacity data capacity
      * @param compress if compress
-     * @return TDMsg1
+     * @return InLongMsg
      */
-    public static TDMsg1 newTDMsg(int capacity, boolean compress) {
-        return new TDMsg1(capacity, compress, Version.v1);
+    public static InLongMsg newInLongMsg(int capacity, boolean compress) {
+        return new InLongMsg(capacity, compress, Version.v1);
     }
 
     /**
-     * netTDmsg
+     * netInLongMsg
      * @param capacity data capacity
      * @param compress compress
      * @param v        version
-     * @return TDMsg1
+     * @return InLongMsg
      */
-    public static TDMsg1 newTDMsg(int capacity, boolean compress, int v) {
-        return new TDMsg1(capacity, compress, Version.of(v));
+    public static InLongMsg newInLongMsg(int capacity, boolean compress, int v) {
+        return new InLongMsg(capacity, compress, Version.of(v));
     }
 
     // for create
-    private TDMsg1(int capacity, boolean compress, Version v) {
+    private InLongMsg(int capacity, boolean compress, Version v) {
         version = v;
         addmode = true;
         this.compress = compress;
@@ -616,7 +616,7 @@ public class TDMsg1 {
     private ByteBuffer parsedBinInput;
 
     // for parsed
-    private TDMsg1(ByteBuffer buffer, Version magic) throws IOException {
+    private InLongMsg(ByteBuffer buffer, Version magic) throws IOException {
         version = magic;
         addmode = false;
         capacity = 0;
@@ -914,18 +914,18 @@ public class TDMsg1 {
         return Version.vn;
     }
 
-    public static TDMsg1 parseFrom(byte[] data) {
+    public static InLongMsg parseFrom(byte[] data) {
         return parseFrom(ByteBuffer.wrap(data));
     }
 
-    public static TDMsg1 parseFrom(ByteBuffer buffer) {
+    public static InLongMsg parseFrom(ByteBuffer buffer) {
         Version magic = getMagic(buffer);
         if (magic == Version.vn) {
             return null;
         }
 
         try {
-            return new TDMsg1(buffer, magic);
+            return new InLongMsg(buffer, magic);
         } catch (IOException e) {
             return null;
         }
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsgAttrBuilder.java b/inlong-common/src/main/java/org/apache/inlong/commons/msg/InLongMsgAttrBuilder.java
similarity index 88%
rename from inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsgAttrBuilder.java
rename to inlong-common/src/main/java/org/apache/inlong/commons/msg/InLongMsgAttrBuilder.java
index 69c20f1..30c0471 100644
--- a/inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsgAttrBuilder.java
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/msg/InLongMsgAttrBuilder.java
@@ -22,7 +22,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
-public class TDMsgAttrBuilder {
+public class InLongMsgAttrBuilder {
 
     public enum PartitionUnit {
         DAY("d"), HOUR("h"), HALFHOUR("n"),
@@ -334,93 +334,93 @@ public class TDMsgAttrBuilder {
         SimpleDateFormat f1 =
                 new SimpleDateFormat("yyyyMMddHH");
 
-        System.out.println(TDMsgAttrBuilder.getProtocolM0()
+        System.out.println(InLongMsgAttrBuilder.getProtocolM0()
                 .setId("interfaceid").setTimeType(TimeType.S)
                 .setTime(String.valueOf(System.currentTimeMillis() / 1000))
                 .buildAttr());
-        System.out.println(TDMsgAttrBuilder.getProtocolM0()
+        System.out.println(InLongMsgAttrBuilder.getProtocolM0()
                 .setId("interfaceid").setTime(System.currentTimeMillis())
                 .setTimeType(TimeType.MS).buildAttr());
-        System.out.println(TDMsgAttrBuilder.getProtocolM0()
+        System.out.println(InLongMsgAttrBuilder.getProtocolM0()
                 .setId("interfaceid")
                 .setTime(f1.format(new Date(System.currentTimeMillis())))
                 .buildAttr());
-        System.out.println(TDMsgAttrBuilder.getProtocolM0()
+        System.out.println(InLongMsgAttrBuilder.getProtocolM0()
                 .setId("interfaceid")
                 .setTime(f.format(new Date(System.currentTimeMillis())))
                 .setTimeType(TimeType.STANDARD).buildAttr());
-        System.out.println(TDMsgAttrBuilder.getProtocolM0()
+        System.out.println(InLongMsgAttrBuilder.getProtocolM0()
                 .setId("interfaceid")
                 .setTime(f1.format(new Date(System.currentTimeMillis())))
                 .setTimeType(TimeType.NORMAL).buildAttr());
-        System.out.println(TDMsgAttrBuilder.getProtocolM0()
+        System.out.println(InLongMsgAttrBuilder.getProtocolM0()
                 .setId("interfaceid").setTimeType(TimeType.S)
                 .setTime(String.valueOf(System.currentTimeMillis() / 1000))
                 .setPartitionUnit(PartitionUnit.DAY).buildAttr());
-        System.out.println(TDMsgAttrBuilder.getProtocolM0()
+        System.out.println(InLongMsgAttrBuilder.getProtocolM0()
                 .setId("interfaceid").setTimeType(TimeType.S)
                 .setTime(String.valueOf(System.currentTimeMillis() / 1000))
                 .setPartitionUnit(PartitionUnit.HOUR).buildAttr());
-        System.out.println(TDMsgAttrBuilder.getProtocolM0()
+        System.out.println(InLongMsgAttrBuilder.getProtocolM0()
                 .setId("interfaceid").setTimeType(TimeType.S)
                 .setTime(String.valueOf(System.currentTimeMillis() / 1000))
                 .setPartitionUnit(PartitionUnit.QUARTER).buildAttr());
         System.out.println();
 
-        System.out.print(TDMsgAttrBuilder.getProtocolM100().buildAttr());
+        System.out.print(InLongMsgAttrBuilder.getProtocolM100().buildAttr());
         System.out.println("\t\t\t\t\t\t// ---- all the param is "
                 + "default : s=\\t, idp=0, tp=1, tt=#ms, p=h ");
 
-        System.out.print(TDMsgAttrBuilder.getProtocolM100().setSpliter(",")
+        System.out.print(InLongMsgAttrBuilder.getProtocolM100().setSpliter(",")
                 .buildAttr());
         System.out.println("\t\t\t\t\t// ---- : idp=0, tp=1, tt=#ms, p=h ");
 
-        System.out.print(TDMsgAttrBuilder.getProtocolM100().setSpliter(",")
+        System.out.print(InLongMsgAttrBuilder.getProtocolM100().setSpliter(",")
                 .setIdPos(0).buildAttr());
         System.out.println("\t\t\t\t\t// ---- : tp=1, tt=#ms, p=h ");
 
-        System.out.print(TDMsgAttrBuilder.getProtocolM100().setSpliter(",")
+        System.out.print(InLongMsgAttrBuilder.getProtocolM100().setSpliter(",")
                 .setTimePos(1).buildAttr());
         System.out.println("\t\t\t\t\t// ---- : idp=0, tt=#ms, p=h ");
 
-        System.out.print(TDMsgAttrBuilder.getProtocolM100().setSpliter(",")
+        System.out.print(InLongMsgAttrBuilder.getProtocolM100().setSpliter(",")
                 .setIdPos(0).setTimePos(1).buildAttr());
         System.out.println("\t\t\t\t// ---- : tt=#ms, p=h ");
 
-        System.out.print(TDMsgAttrBuilder.getProtocolM100().setSpliter(",")
+        System.out.print(InLongMsgAttrBuilder.getProtocolM100().setSpliter(",")
                 .setIdPos(0).setTimePos(1).setTimeType(TimeType.S).buildAttr());
         System.out.println("\t\t\t// ---- : p=h ");
 
-        System.out.print(TDMsgAttrBuilder.getProtocolM100().setSpliter(",")
+        System.out.print(InLongMsgAttrBuilder.getProtocolM100().setSpliter(",")
                 .setIdPos(0).setTimePos(1)
                 .setPartitionUnit(PartitionUnit.QUARTER).buildAttr());
         System.out.println("\t\t\t// ---- : tt=#s ");
 
-        System.out.print(TDMsgAttrBuilder.getProtocolM100().setSpliter(",")
+        System.out.print(InLongMsgAttrBuilder.getProtocolM100().setSpliter(",")
                 .setIdPos(0).setTimePos(1).setTimeType(TimeType.MS)
                 .setPartitionUnit(PartitionUnit.QUARTER).buildAttr());
         System.out.println("\t\t\t// ---- : all ");
 
-        System.out.print(TDMsgAttrBuilder.getProtocolM100()
+        System.out.print(InLongMsgAttrBuilder.getProtocolM100()
                 .setId("interfaceid").setSpliter(",").setIdPos(0).setTimePos(1)
                 .setTimeType(TimeType.MS)
                 .setPartitionUnit(PartitionUnit.QUARTER).buildAttr());
         System.out.println("\t// ---- : id is set so idp is ignored ");
 
-        System.out.print(TDMsgAttrBuilder.getProtocolM100().setSpliter(",")
+        System.out.print(InLongMsgAttrBuilder.getProtocolM100().setSpliter(",")
                 .setIdPos(0).setTimePos(1).setTime(System.currentTimeMillis())
                 .setTimeType(TimeType.MS)
                 .setPartitionUnit(PartitionUnit.QUARTER).buildAttr());
         System.out.println("\t\t\t// ---- : t is set so tp is ignored ");
 
-        System.out.print(TDMsgAttrBuilder.getProtocolM100()
+        System.out.print(InLongMsgAttrBuilder.getProtocolM100()
                 .setId("interfaceid").setSpliter(",").setIdPos(0).setTimePos(1)
                 .setTime(System.currentTimeMillis()).setTimeType(TimeType.MS)
                 .setPartitionUnit(PartitionUnit.QUARTER).buildAttr());
         System.out
                 .println("\t\t// ---- : id and t are all set so idpos and tp are all ignored ");
 
-        System.out.print(TDMsgAttrBuilder.getProtocolM100()
+        System.out.print(InLongMsgAttrBuilder.getProtocolM100()
                 .setId("interfaceid").setSpliter(",").setIdPos(0).setTimePos(1)
                 .setTime(f1.format(new Date(System.currentTimeMillis())))
                 .setTimeType(TimeType.NORMAL)
@@ -432,21 +432,21 @@ public class TDMsgAttrBuilder {
 
         // long time = System.currentTimeMillis();
         // byte[] data0 = ("id," + time + ",other,data").getBytes();
-        // String attr = TDMsgProtocolFactory.getProtololM100().setSpliter(",")
+        // String attr = InLongMsgProtocolFactory.getProtololM100().setSpliter(",")
         // .setIdPos(0).setTimePos(1).setTimeType(TimeType.MS)
         // .setPartitionUnit(PartitionUnit.QUARTER).buildAttr();
-        // TDMsg1 tdmsg = TDMsg1.newTDMsg();
-        // tdmsg.addMsg(attr, data0);
-        // byte[] result = tdmsg.buildArray();
+        // InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        // inLongMsg.addMsg(attr, data0);
+        // byte[] result = inLongMsg.buildArray();
         // System.out.println(new String(result));
-        // attr = TDMsgProtocolFactory.getProtololM0().setSpliter(",")
+        // attr = InLongMsgProtocolFactory.getProtololM0().setSpliter(",")
         // .setTime(System.currentTimeMillis()).setTimeType(TimeType.MS)
         // .setPartitionUnit(PartitionUnit.QUARTER).buildAttr();
-        // tdmsg = TDMsg1.newTDMsg();
-        // tdmsg.addMsg(attr, data0);
-        // result = tdmsg.buildArray();
+        // inLongMsg = InLongMsg.newInLongMsg();
+        // inLongMsg.addMsg(attr, data0);
+        // result = inLongMsg.buildArray();
         // System.out.println(new String(result));
-        System.out.print(TDMsgAttrBuilder.getProtocolM100().setSpliter(",")
+        System.out.print(InLongMsgAttrBuilder.getProtocolM100().setSpliter(",")
                 .setIdPos(0).setTimePos(1)
 
                 .setTimeType(TimeType.STANDARD)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
index ba00758..c57a205 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
@@ -37,7 +37,7 @@ import org.apache.inlong.commons.monitor.CounterGroupExt;
 import org.apache.inlong.commons.monitor.MonitorIndex;
 import org.apache.inlong.commons.monitor.MonitorIndexExt;
 import org.apache.inlong.commons.monitor.StatConstants;
-import org.apache.inlong.commons.msg.TDMsg1;
+import org.apache.inlong.commons.msg.InLongMsg;
 import org.apache.inlong.commons.util.NetworkUtils;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
@@ -128,14 +128,14 @@ public class SimpleMessageHandler
             msgCount = "1";
         }
 
-        TDMsg1 tdMsg = TDMsg1.newTDMsg(true);
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg(true);
         String charset = (String) context.get(HttpSourceConstants.CHARSET);
         if (charset == null || "".equals(charset)) {
             charset = "UTF-8";
         }
         String body = (String) context.get(HttpSourceConstants.BODY);
         try {
-            tdMsg.addMsg(newAttrBuffer.toString(), body.getBytes(charset));
+            inLongMsg.addMsg(newAttrBuffer.toString(), body.getBytes(charset));
         } catch (UnsupportedEncodingException e) {
             throw new MessageProcessException(e);
         }
@@ -147,9 +147,9 @@ public class SimpleMessageHandler
         headers.put(ConfigConstants.REMOTE_IP_KEY, strRemoteIP);
         headers.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE);
         headers.put(ConfigConstants.MSG_COUNTER_KEY, msgCount);
-        byte[] data = tdMsg.buildArray();
+        byte[] data = inLongMsg.buildArray();
         headers.put(ConfigConstants.TOTAL_LEN, String.valueOf(data.length));
-        String pkgTime = dateFormator.get().format(tdMsg.getCreatetime());
+        String pkgTime = dateFormator.get().format(inLongMsg.getCreatetime());
         headers.put(ConfigConstants.PKG_TIME_KEY, pkgTime);
         Event event = EventBuilder.withBody(data, headers);
 
@@ -178,7 +178,7 @@ public class SimpleMessageHandler
             monitorIndex
                     .addAndGet(new String(newbase), Integer.parseInt(msgCount), 1, data.length, 0);
         }
-        tdMsg.reset();
+        inLongMsg.reset();
 
         long beginTime = System.currentTimeMillis();
         try {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 44dab2c..362ee71 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -41,7 +41,7 @@ import org.apache.flume.event.EventBuilder;
 import org.apache.flume.source.AbstractSource;
 import org.apache.inlong.commons.monitor.MonitorIndex;
 import org.apache.inlong.commons.monitor.MonitorIndexExt;
-import org.apache.inlong.commons.msg.TDMsg1;
+import org.apache.inlong.commons.msg.InLongMsg;
 import org.apache.inlong.dataproxy.base.ProxyMessage;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
@@ -384,33 +384,33 @@ public class ServerMessageHandler extends SimpleChannelHandler {
             Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
             String strRemoteIP, MsgType msgType) throws MessageIDException {
 
-        int tdMsgVer = 1;
+        int inLongMsgVer = 1;
         if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
-            tdMsgVer = 3;
+            inLongMsgVer = 3;
         } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
-            tdMsgVer = 4;
+            inLongMsgVer = 4;
         }
 
         for (Map.Entry<String, HashMap<String, List<ProxyMessage>>> topicEntry : messageMap.entrySet()) {
             for (Map.Entry<String, List<ProxyMessage>> streamIdEntry : topicEntry.getValue().entrySet()) {
 
-                TDMsg1 tdMsg = TDMsg1.newTDMsg(this.isCompressed, tdMsgVer);
+                InLongMsg inLongMsg = InLongMsg.newInLongMsg(this.isCompressed, inLongMsgVer);
                 Map<String, String> headers = new HashMap<String, String>();
                 for (ProxyMessage message : streamIdEntry.getValue()) {
                     if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType) || MsgType.MSG_MULTI_BODY.equals(msgType)) {
                         message.getAttributeMap().put(AttributeConstants.MESSAGE_COUNT, String.valueOf(1));
-                        tdMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
+                        inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
                     } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
-                        tdMsg.addMsg(message.getData());
+                        inLongMsg.addMsg(message.getData());
                     } else {
-                        tdMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
+                        inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
                     }
                 }
 
-                long pkgTimeInMillis = tdMsg.getCreatetime();
+                long pkgTimeInMillis = inLongMsg.getCreatetime();
                 String pkgTimeStr = dateFormator.get().format(pkgTimeInMillis);
 
-                if (tdMsgVer == 4) {
+                if (inLongMsgVer == 4) {
                     if (commonAttrMap.containsKey(ConfigConstants.PKG_TIME_KEY)) {
                         pkgTimeStr = commonAttrMap.get(ConfigConstants.PKG_TIME_KEY);
                     } else {
@@ -432,7 +432,7 @@ public class ServerMessageHandler extends SimpleChannelHandler {
                 String proxyMetricMsgCnt = commonAttrMap.get(AttributeConstants.MESSAGE_COUNT);
                 headers.put(ConfigConstants.MSG_COUNTER_KEY, proxyMetricMsgCnt);
 
-                byte[] data = tdMsg.buildArray();
+                byte[] data = inLongMsg.buildArray();
                 headers.put(ConfigConstants.TOTAL_LEN, String.valueOf(data.length));
 
                 String sequenceId = commonAttrMap.get(AttributeConstants.SEQUENCE_ID);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
index 2159b41..f00f19f 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
@@ -40,7 +40,7 @@ import org.apache.flume.Event;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.source.AbstractSource;
-import org.apache.inlong.commons.msg.TDMsg1;
+import org.apache.inlong.commons.msg.InLongMsg;
 import org.apache.inlong.dataproxy.base.ProxyMessage;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
@@ -375,33 +375,33 @@ public class SimpleMessageHandler extends SimpleChannelHandler {
             Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
             String strRemoteIP, MsgType msgType) throws MessageIDException {
 
-        int tdMsgVer = 1;
+        int inLongMsgVer = 1;
         if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
-            tdMsgVer = 3;
+            inLongMsgVer = 3;
         } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
-            tdMsgVer = 4;
+            inLongMsgVer = 4;
         }
 
         for (Map.Entry<String, HashMap<String, List<ProxyMessage>>> topicEntry : messageMap.entrySet()) {
             for (Map.Entry<String, List<ProxyMessage>> streamIdEntry : topicEntry.getValue().entrySet()) {
 
-                TDMsg1 tdMsg = TDMsg1.newTDMsg(this.isCompressed, tdMsgVer);
+                InLongMsg inLongMsg = InLongMsg.newInLongMsg(this.isCompressed, inLongMsgVer);
                 Map<String, String> headers = new HashMap<String, String>();
                 for (ProxyMessage message : streamIdEntry.getValue()) {
                     if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType) || MsgType.MSG_MULTI_BODY.equals(msgType)) {
                         message.getAttributeMap().put(AttributeConstants.MESSAGE_COUNT, String.valueOf(1));
-                        tdMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
+                        inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
                     } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
-                        tdMsg.addMsg(message.getData());
+                        inLongMsg.addMsg(message.getData());
                     } else {
-                        tdMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
+                        inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
                     }
                 }
 
-                long pkgTimeInMillis = tdMsg.getCreatetime();
+                long pkgTimeInMillis = inLongMsg.getCreatetime();
                 String pkgTimeStr = dateFormator.get().format(pkgTimeInMillis);
 
-                if (tdMsgVer == 4) {
+                if (inLongMsgVer == 4) {
                     if (commonAttrMap.containsKey(ConfigConstants.PKG_TIME_KEY)) {
                         pkgTimeStr = commonAttrMap.get(ConfigConstants.PKG_TIME_KEY);
                     } else {
@@ -421,7 +421,7 @@ public class SimpleMessageHandler extends SimpleChannelHandler {
                 String proxyMetricMsgCnt = commonAttrMap.get(AttributeConstants.MESSAGE_COUNT);
                 headers.put(ConfigConstants.MSG_COUNTER_KEY, proxyMetricMsgCnt);
 
-                byte[] data = tdMsg.buildArray();
+                byte[] data = inLongMsg.buildArray();
                 headers.put(ConfigConstants.TOTAL_LEN, String.valueOf(data.length));
 
                 String sequenceId = commonAttrMap.get(AttributeConstants.SEQUENCE_ID);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
index d5f0ca1..1263bc0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
@@ -51,7 +51,7 @@ import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
 import org.apache.inlong.sort.protocol.DataFlowInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
-import org.apache.inlong.sort.protocol.deserialization.TDMsgCsvDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.InLongMsgCsvDeserializationInfo;
 import org.apache.inlong.sort.protocol.sink.HiveSinkInfo;
 import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HiveFileFormat;
 import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HiveTimePartitionInfo;
@@ -226,7 +226,7 @@ public class PushHiveConfigTaskListener implements TaskEventListener {
         DeserializationInfo deserializationInfo = null;
         boolean isDbType = BizConstant.DATA_SOURCE_DB.equals(storageInfo.getDataSourceType());
         if (!isDbType) {
-            // FILE and auto push source, the data format is TEXT or KEY-VALUE, temporarily use TDMsgCsv
+            // FILE and auto push source, the data format is TEXT or KEY-VALUE, temporarily use InLongMsgCsv
             String dataType = storageInfo.getDataType();
             if (BizConstant.DATA_TYPE_TEXT.equalsIgnoreCase(dataType)
                     || BizConstant.DATA_TYPE_KEY_VALUE.equalsIgnoreCase(dataType)) {
@@ -238,7 +238,7 @@ public class PushHiveConfigTaskListener implements TaskEventListener {
                     escape = info.getDataEscapeChar().charAt(0);
                 }*/
                 // Whether to delete the first separator, the default is false for the time being
-                deserializationInfo = new TDMsgCsvDeserializationInfo(storageInfo.getInlongStreamId(), separator);
+                deserializationInfo = new InLongMsgCsvDeserializationInfo(storageInfo.getInlongStreamId(), separator);
             }
         }
 
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
index f911311..844f506 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
@@ -32,11 +32,11 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
         property = "type")
 @JsonSubTypes({
         @Type(value = CsvDeserializationInfo.class, name = "csv"),
-        @Type(value = TDMsgCsvDeserializationInfo.class, name = "tdmsg_csv"),
-        @Type(value = TDMsgCsv2DeserializationInfo.class, name = "tdmsg_csv2"),
-        @Type(value = TDMsgKvDeserializationInfo.class, name = "tdmsg_kv"),
-        @Type(value = TDMsgTlogCsvDeserializationInfo.class, name = "tdmsg_tlog_csv"),
-        @Type(value = TDMsgTlogKvDeserializationInfo.class, name = "tdmsg_tlog_kv")
+        @Type(value = InLongMsgCsvDeserializationInfo.class, name = "inlongmsg_csv"),
+        @Type(value = InLongMsgCsv2DeserializationInfo.class, name = "inlongmsg_csv2"),
+        @Type(value = InLongMsgKvDeserializationInfo.class, name = "inlongmsg_kv"),
+        @Type(value = InLongMsgTlogCsvDeserializationInfo.class, name = "inlongmsg_tlog_csv"),
+        @Type(value = InLongMsgTlogKvDeserializationInfo.class, name = "inlongmsg_tlog_kv")
 })
 public interface DeserializationInfo extends Serializable {
 
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgCsv2DeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
similarity index 88%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgCsv2DeserializationInfo.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
index 4039ee8..0806717 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgCsv2DeserializationInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
@@ -21,16 +21,16 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 /**
- * It represents CSV2 format of TDMsg(m=9).
+ * It represents CSV2 format of InLongMsg(m=9).
  */
-public class TDMsgCsv2DeserializationInfo extends TDMsgDeserializationInfo {
+public class InLongMsgCsv2DeserializationInfo extends InLongMsgDeserializationInfo {
 
     private static final long serialVersionUID = 2188769102604850019L;
 
     private final char delimiter;
 
     @JsonCreator
-    public TDMsgCsv2DeserializationInfo(
+    public InLongMsgCsv2DeserializationInfo(
             @JsonProperty("tid") String tid,
             @JsonProperty("delimiter") char delimiter) {
         super(tid);
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgCsvDeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
similarity index 90%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgCsvDeserializationInfo.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
index 322a8d0..890cea3 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgCsvDeserializationInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
@@ -23,9 +23,9 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInc
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 /**
- * It represents CSV format of TDMsg(m=0).
+ * It represents CSV format of InLongMsg(m=0).
  */
-public class TDMsgCsvDeserializationInfo extends TDMsgDeserializationInfo {
+public class InLongMsgCsvDeserializationInfo extends InLongMsgDeserializationInfo {
 
     private static final long serialVersionUID = 1499370571949888870L;
 
@@ -34,14 +34,14 @@ public class TDMsgCsvDeserializationInfo extends TDMsgDeserializationInfo {
     @JsonInclude(Include.NON_NULL)
     private final boolean deleteHeadDelimiter;
 
-    public TDMsgCsvDeserializationInfo(
+    public InLongMsgCsvDeserializationInfo(
             @JsonProperty("tid") String tid,
             @JsonProperty("delimiter") char delimiter) {
         this(tid, delimiter, true);
     }
 
     @JsonCreator
-    public TDMsgCsvDeserializationInfo(
+    public InLongMsgCsvDeserializationInfo(
             @JsonProperty("tid") String tid,
             @JsonProperty("delimiter") char delimiter,
             @JsonProperty("delete_head_delimiter") boolean deleteHeadDelimiter) {
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgDeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgDeserializationInfo.java
similarity index 86%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgDeserializationInfo.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgDeserializationInfo.java
index 1b36161..e311beb 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgDeserializationInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgDeserializationInfo.java
@@ -22,15 +22,15 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 /**
- * TDMsgDeserializationInfo.
+ * InLongMsgDeserializationInfo.
  */
-public abstract class TDMsgDeserializationInfo implements DeserializationInfo {
+public abstract class InLongMsgDeserializationInfo implements DeserializationInfo {
 
     private static final long serialVersionUID = 3707412713264864315L;
 
     private final String tid;
 
-    public TDMsgDeserializationInfo(@JsonProperty("tid") String tid) {
+    public InLongMsgDeserializationInfo(@JsonProperty("tid") String tid) {
         this.tid = checkNotNull(tid);
     }
 
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgKvDeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
similarity index 90%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgKvDeserializationInfo.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
index 068b25e..c61825d 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgKvDeserializationInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
@@ -20,9 +20,9 @@ package org.apache.inlong.sort.protocol.deserialization;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 /**
- * It represents KV format of TDMsg(m=5).
+ * It represents KV format of InLongMsg(m=5).
  */
-public class TDMsgKvDeserializationInfo extends TDMsgDeserializationInfo {
+public class InLongMsgKvDeserializationInfo extends InLongMsgDeserializationInfo {
 
     private static final long serialVersionUID = 8431516458466278968L;
 
@@ -30,7 +30,7 @@ public class TDMsgKvDeserializationInfo extends TDMsgDeserializationInfo {
 
     private final char kvDelimiter;
 
-    public TDMsgKvDeserializationInfo(
+    public InLongMsgKvDeserializationInfo(
             @JsonProperty("tid") String tid,
             @JsonProperty("entry_delimiter") char entryDelimiter,
             @JsonProperty("kv_delimiter") char kvDelimiter
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgTlogCsvDeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
similarity index 88%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgTlogCsvDeserializationInfo.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
index 34822a8..53426ff 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgTlogCsvDeserializationInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
@@ -21,16 +21,16 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 /**
- * It represents TLog CSV format of TDMsg(m=10).
+ * It represents TLog CSV format of InLongMsg(m=10).
  */
-public class TDMsgTlogCsvDeserializationInfo extends TDMsgDeserializationInfo {
+public class InLongMsgTlogCsvDeserializationInfo extends InLongMsgDeserializationInfo {
 
     private static final long serialVersionUID = -6585242216925992303L;
 
     private final char delimiter;
 
     @JsonCreator
-    public TDMsgTlogCsvDeserializationInfo(
+    public InLongMsgTlogCsvDeserializationInfo(
             @JsonProperty("tid") String tid,
             @JsonProperty("delimiter") char delimiter) {
         super(tid);
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgTlogKvDeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
similarity index 90%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgTlogKvDeserializationInfo.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
index fb28f8e..ace1038 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgTlogKvDeserializationInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
@@ -20,9 +20,9 @@ package org.apache.inlong.sort.protocol.deserialization;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 /**
- * It represents TLog KV format of TDMsg(m=15).
+ * It represents TLog KV format of InLongMsg(m=15).
  */
-public class TDMsgTlogKvDeserializationInfo extends TDMsgDeserializationInfo {
+public class InLongMsgTlogKvDeserializationInfo extends InLongMsgDeserializationInfo {
 
     private static final long serialVersionUID = 3299931901024581425L;
 
@@ -32,7 +32,7 @@ public class TDMsgTlogKvDeserializationInfo extends TDMsgDeserializationInfo {
 
     private final char kvDelimiter;
 
-    public TDMsgTlogKvDeserializationInfo(
+    public InLongMsgTlogKvDeserializationInfo(
             @JsonProperty("tid") String tid,
             @JsonProperty("delimiter") char delimiter,
             @JsonProperty("entry_delimiter") char entryDelimiter,
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/DeserializationInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/DeserializationInfoTest.java
index 5959752..e903e9b 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/DeserializationInfoTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/DeserializationInfoTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessin
 import org.apache.inlong.sort.protocol.deserialization.CsvDeserializationInfo;
 import java.io.IOException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.inlong.sort.protocol.deserialization.TDMsgCsvDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.InLongMsgCsvDeserializationInfo;
 import org.apache.inlong.sort.protocol.sink.ClickHouseSinkInfo;
 import org.apache.inlong.sort.protocol.sink.ClickHouseSinkInfo.PartitionStrategy;
 import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
@@ -44,7 +44,7 @@ public class DeserializationInfoTest {
         DataFlowInfo dataFlowInfo = new DataFlowInfo(
             1,
             new TubeSourceInfo("topic" + System.currentTimeMillis(), "ma", "cg",
-                new TDMsgCsvDeserializationInfo("tid", ','), new FieldInfo[0]),
+                new InLongMsgCsvDeserializationInfo("tid", ','), new FieldInfo[0]),
             new ClickHouseSinkInfo("url", "dn", "tn", "un", "pw",
                 false, PartitionStrategy.HASH, "pk", new FieldInfo[0], new String[0],
                 100, 100, 100));
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/source/PulsarSourceInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/source/PulsarSourceInfoTest.java
index f5ffd67..1ebd6f2 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/source/PulsarSourceInfoTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/source/PulsarSourceInfoTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
-import org.apache.inlong.sort.protocol.deserialization.TDMsgCsvDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.InLongMsgCsvDeserializationInfo;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -35,7 +35,7 @@ public class PulsarSourceInfoTest {
                 new FieldInfo("f1", StringFormatInfo.INSTANCE),
                 new FieldInfo("f2", StringFormatInfo.INSTANCE)
         };
-        DeserializationInfo deserializationInfo = new TDMsgCsvDeserializationInfo("stream", ',');
+        DeserializationInfo deserializationInfo = new InLongMsgCsvDeserializationInfo("stream", ',');
 
         PulsarSourceInfo pulsarSourceInfo = new PulsarSourceInfo(
                 "http://127.0.0.1:8080",
@@ -54,7 +54,7 @@ public class PulsarSourceInfoTest {
             Assert.assertTrue(pulsarSourceInfo.getServiceUrl().equals("pulsar://127.0.0.1:6650"));
             Assert.assertTrue(pulsarSourceInfo.getTopic().equals("business"));
             Assert.assertTrue(pulsarSourceInfo.getSubscriptionName().equals("consumer"));
-            Assert.assertTrue(pulsarSourceInfo.getDeserializationInfo() instanceof TDMsgCsvDeserializationInfo);
+            Assert.assertTrue(pulsarSourceInfo.getDeserializationInfo() instanceof InLongMsgCsvDeserializationInfo);
             Assert.assertTrue(pulsarSourceInfo.getAuthentication() == null);
         } catch (JsonProcessingException e) {
             Assert.fail();
diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml
index d5f4b38..f958a5c 100644
--- a/inlong-sort/sort-core/pom.xml
+++ b/inlong-sort/sort-core/pom.xml
@@ -128,13 +128,13 @@
 
         <dependency>
             <groupId>org.apache.inlong</groupId>
-            <artifactId>sort-format-tdmsg-base</artifactId>
+            <artifactId>sort-format-inlongmsg-base</artifactId>
             <version>${project.version}</version>
         </dependency>
 
         <dependency>
             <groupId>org.apache.inlong</groupId>
-            <artifactId>sort-format-tdmsg-csv</artifactId>
+            <artifactId>sort-format-inlongmsg-csv</artifactId>
             <version>${project.version}</version>
         </dependency>
 
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgMixedSerializedRecord.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/InLongMsgMixedSerializedRecord.java
similarity index 82%
rename from inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgMixedSerializedRecord.java
rename to inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/InLongMsgMixedSerializedRecord.java
index 52164dc..362f71a 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgMixedSerializedRecord.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/InLongMsgMixedSerializedRecord.java
@@ -20,9 +20,9 @@ package org.apache.inlong.sort.flink;
 import static org.apache.inlong.sort.configuration.Constants.UNKNOWN_DATAFLOW_ID;
 
 /**
- * Data flow id might not been got from mixed TDMsg data stream.
+ * Data flow id might not been got from mixed InLongMsg data stream.
  */
-public class TDMsgMixedSerializedRecord extends SerializedRecord {
+public class InLongMsgMixedSerializedRecord extends SerializedRecord {
 
     private static final long serialVersionUID = 4075321919886376829L;
 
@@ -31,11 +31,11 @@ public class TDMsgMixedSerializedRecord extends SerializedRecord {
     /**
      * Just satisfy requirement of Flink Pojo definition.
      */
-    public TDMsgMixedSerializedRecord() {
+    public InLongMsgMixedSerializedRecord() {
         super();
     }
 
-    public TDMsgMixedSerializedRecord(String topic, long timestampMillis, byte[] data) {
+    public InLongMsgMixedSerializedRecord(String topic, long timestampMillis, byte[] data) {
         super(UNKNOWN_DATAFLOW_ID, timestampMillis, data);
         this.topic = topic;
     }
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/DeserializationSchema.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/DeserializationSchema.java
index 5aa92f1..b0853e2 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/DeserializationSchema.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/DeserializationSchema.java
@@ -30,7 +30,7 @@ import org.apache.inlong.sort.configuration.Configuration;
 import org.apache.inlong.sort.configuration.Constants;
 import org.apache.inlong.sort.flink.Record;
 import org.apache.inlong.sort.flink.SerializedRecord;
-import org.apache.inlong.sort.flink.TDMsgMixedSerializedRecord;
+import org.apache.inlong.sort.flink.InLongMsgMixedSerializedRecord;
 import org.apache.inlong.sort.flink.metrics.MetricData;
 import org.apache.inlong.sort.flink.metrics.MetricData.MetricSource;
 import org.apache.inlong.sort.flink.metrics.MetricData.MetricType;
@@ -61,7 +61,7 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
 
     private transient Object schemaLock;
 
-    private transient MultiTenancyTDMsgMixedDeserializer multiTenancyTdMsgMixedDeserializer;
+    private transient MultiTenancyInLongMsgMixedDeserializer multiTenancyInLongMsgMixedDeserializer;
 
     private transient MultiTenancyDeserializer multiTenancyDeserializer;
 
@@ -81,7 +81,7 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
     @Override
     public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
         schemaLock = new Object();
-        multiTenancyTdMsgMixedDeserializer = new MultiTenancyTDMsgMixedDeserializer();
+        multiTenancyInLongMsgMixedDeserializer = new MultiTenancyInLongMsgMixedDeserializer();
         multiTenancyDeserializer = new MultiTenancyDeserializer();
         fieldMappingTransformer = new FieldMappingTransformer();
         recordTransformer = new RecordTransformer(config.getInteger(Constants.ETL_RECORD_SERIALIZATION_BUFFER_SIZE));
@@ -164,10 +164,11 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
                 collector.collect(serializedSinkRecord);
             });
 
-            if (serializedRecord instanceof TDMsgMixedSerializedRecord) {
-                final TDMsgMixedSerializedRecord tdmsgRecord = (TDMsgMixedSerializedRecord) serializedRecord;
+            if (serializedRecord instanceof InLongMsgMixedSerializedRecord) {
+                final InLongMsgMixedSerializedRecord
+                        inlongmsgRecord = (InLongMsgMixedSerializedRecord) serializedRecord;
                 synchronized (schemaLock) {
-                    multiTenancyTdMsgMixedDeserializer.deserialize(tdmsgRecord, transformCollector);
+                    multiTenancyInLongMsgMixedDeserializer.deserialize(inlongmsgRecord, transformCollector);
                 }
             } else {
                 synchronized (schemaLock) {
@@ -196,7 +197,7 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
         @Override
         public void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
             synchronized (schemaLock) {
-                multiTenancyTdMsgMixedDeserializer.addDataFlow(dataFlowInfo);
+                multiTenancyInLongMsgMixedDeserializer.addDataFlow(dataFlowInfo);
                 multiTenancyDeserializer.addDataFlow(dataFlowInfo);
                 fieldMappingTransformer.addDataFlow(dataFlowInfo);
                 recordTransformer.addDataFlow(dataFlowInfo);
@@ -210,7 +211,7 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
         @Override
         public void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
             synchronized (schemaLock) {
-                multiTenancyTdMsgMixedDeserializer.updateDataFlow(dataFlowInfo);
+                multiTenancyInLongMsgMixedDeserializer.updateDataFlow(dataFlowInfo);
                 multiTenancyDeserializer.updateDataFlow(dataFlowInfo);
                 fieldMappingTransformer.updateDataFlow(dataFlowInfo);
                 recordTransformer.updateDataFlow(dataFlowInfo);
@@ -224,7 +225,7 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
         @Override
         public void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
             synchronized (schemaLock) {
-                multiTenancyTdMsgMixedDeserializer.removeDataFlow(dataFlowInfo);
+                multiTenancyInLongMsgMixedDeserializer.removeDataFlow(dataFlowInfo);
                 multiTenancyDeserializer.removeDataFlow(dataFlowInfo);
                 fieldMappingTransformer.removeDataFlow(dataFlowInfo);
                 recordTransformer.removeDataFlow(dataFlowInfo);
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgDeserializer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/InLongMsgDeserializer.java
similarity index 81%
rename from inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgDeserializer.java
rename to inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/InLongMsgDeserializer.java
index bda542e..9aea0f7 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgDeserializer.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/InLongMsgDeserializer.java
@@ -20,12 +20,12 @@ package org.apache.inlong.sort.flink.deserialization;
 import org.apache.flink.util.Collector;
 import org.apache.inlong.sort.flink.Record;
 import org.apache.inlong.sort.flink.SerializedRecord;
-import org.apache.inlong.sort.formats.tdmsg.AbstractTDMsgFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
 
-public class TDMsgDeserializer implements Deserializer<SerializedRecord, Record> {
-    private final AbstractTDMsgFormatDeserializer innerDeserializer;
+public class InLongMsgDeserializer implements Deserializer<SerializedRecord, Record> {
+    private final AbstractInLongMsgFormatDeserializer innerDeserializer;
 
-    public TDMsgDeserializer(AbstractTDMsgFormatDeserializer innerDeserializer) {
+    public InLongMsgDeserializer(AbstractInLongMsgFormatDeserializer innerDeserializer) {
         this.innerDeserializer = innerDeserializer;
     }
 
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/InLongMsgMixedDeserializer.java
similarity index 69%
rename from inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializer.java
rename to inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/InLongMsgMixedDeserializer.java
index d396cb7..47c8df2 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializer.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/InLongMsgMixedDeserializer.java
@@ -24,36 +24,36 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.flink.util.Collector;
 import org.apache.inlong.sort.flink.Record;
-import org.apache.inlong.sort.flink.TDMsgMixedSerializedRecord;
-import org.apache.inlong.sort.formats.tdmsg.AbstractTDMsgFormatDeserializer;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgMixedFormatConverter;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgUtils;
+import org.apache.inlong.sort.flink.InLongMsgMixedSerializedRecord;
+import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverter;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
 
 /**
- * A deserializer to handle mixed TDMsg records of one topic.
+ * A deserializer to handle mixed InLongMsg records of one topic.
  */
-public class TDMsgMixedDeserializer implements Deserializer<TDMsgMixedSerializedRecord, Record> {
+public class InLongMsgMixedDeserializer implements Deserializer<InLongMsgMixedSerializedRecord, Record> {
 
     /**
      * Each topic should have same preDeserializer, so just keep one.
      */
-    private AbstractTDMsgFormatDeserializer preDeserializer;
+    private AbstractInLongMsgFormatDeserializer preDeserializer;
 
     /**
      * Tid -> deserializer.
      */
-    private final Map<String, TDMsgMixedFormatConverter> deserializers = new HashMap<>();
+    private final Map<String, InLongMsgMixedFormatConverter> deserializers = new HashMap<>();
 
     /**
      * Tid -> data flow ids.
      */
     private final Map<String, Set<Long>> interface2DataFlowsMap = new HashMap<>();
 
-    public TDMsgMixedDeserializer() {
+    public InLongMsgMixedDeserializer() {
     }
 
-    public void updateDataFlow(long dataFlowId, String tid, AbstractTDMsgFormatDeserializer preDeserializer,
-            TDMsgMixedFormatConverter deserializer) {
+    public void updateDataFlow(long dataFlowId, String tid, AbstractInLongMsgFormatDeserializer preDeserializer,
+            InLongMsgMixedFormatConverter deserializer) {
         // always updates preDeserializer
         this.preDeserializer = preDeserializer;
         deserializers.put(tid, deserializer);
@@ -76,14 +76,15 @@ public class TDMsgMixedDeserializer implements Deserializer<TDMsgMixedSerialized
     }
 
     @Override
-    public void deserialize(TDMsgMixedSerializedRecord tdMsgRecord, Collector<Record> collector) throws Exception {
-        preDeserializer.flatMap(tdMsgRecord.getData(), new CallbackCollector<>(mixedRow -> {
-            final String tid = TDMsgUtils.getTidFromMixedRow(mixedRow);
+    public void deserialize(InLongMsgMixedSerializedRecord inLongMsgRecord,
+                            Collector<Record> collector) throws Exception {
+        preDeserializer.flatMap(inLongMsgRecord.getData(), new CallbackCollector<>(mixedRow -> {
+            final String tid = InLongMsgUtils.getTidFromMixedRow(mixedRow);
             final Set<Long> dataFlowIds = interface2DataFlowsMap.get(tid);
             if (dataFlowIds.isEmpty()) {
                 throw new Exception("No data flow found for tid:" + tid);
             }
-            final TDMsgMixedFormatConverter deserializer = deserializers.get(tid);
+            final InLongMsgMixedFormatConverter deserializer = deserializers.get(tid);
             if (deserializer == null) {
                 throw new Exception("No data flow found for tid:" + tid);
             }
@@ -97,12 +98,12 @@ public class TDMsgMixedDeserializer implements Deserializer<TDMsgMixedSerialized
     }
 
     @VisibleForTesting
-    AbstractTDMsgFormatDeserializer getPreDeserializer() {
+    AbstractInLongMsgFormatDeserializer getPreDeserializer() {
         return preDeserializer;
     }
 
     @VisibleForTesting
-    Map<String, TDMsgMixedFormatConverter> getDeserializers() {
+    Map<String, InLongMsgMixedFormatConverter> getDeserializers() {
         return deserializers;
     }
 
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyDeserializer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyDeserializer.java
index 3e75aa5..f8b8215 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyDeserializer.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyDeserializer.java
@@ -17,8 +17,8 @@
 
 package org.apache.inlong.sort.flink.deserialization;
 
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.util.HashMap;
@@ -28,12 +28,12 @@ import org.apache.inlong.sort.flink.Record;
 import org.apache.inlong.sort.flink.SerializedRecord;
 import org.apache.inlong.sort.formats.base.TableFormatConstants;
 import org.apache.inlong.sort.formats.common.RowFormatInfo;
-import org.apache.inlong.sort.formats.tdmsgcsv.TDMsgCsvFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvFormatDeserializer;
 import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
 import org.apache.inlong.sort.protocol.DataFlowInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
-import org.apache.inlong.sort.protocol.deserialization.TDMsgCsvDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.InLongMsgCsvDeserializationInfo;
 import org.apache.inlong.sort.util.CommonUtils;
 
 public class MultiTenancyDeserializer implements DataFlowInfoListener, Deserializer<SerializedRecord, Record> {
@@ -79,20 +79,21 @@ public class MultiTenancyDeserializer implements DataFlowInfoListener, Deseriali
         final RowFormatInfo rowFormatInfo = CommonUtils.generateRowFormatInfo(fields);
 
         final Deserializer<SerializedRecord, Record> deserializer;
-        if (deserializationInfo instanceof TDMsgCsvDeserializationInfo) {
-            TDMsgCsvDeserializationInfo tdMsgCsvDeserializationInfo = (TDMsgCsvDeserializationInfo) deserializationInfo;
-            TDMsgCsvFormatDeserializer tdMsgCsvFormatDeserializer = new TDMsgCsvFormatDeserializer(
+        if (deserializationInfo instanceof InLongMsgCsvDeserializationInfo) {
+            InLongMsgCsvDeserializationInfo
+                    inLongMsgCsvDeserializationInfo = (InLongMsgCsvDeserializationInfo) deserializationInfo;
+            InLongMsgCsvFormatDeserializer inLongMsgCsvFormatDeserializer = new InLongMsgCsvFormatDeserializer(
                     rowFormatInfo,
                     DEFAULT_TIME_FIELD_NAME,
                     DEFAULT_ATTRIBUTES_FIELD_NAME,
                     TableFormatConstants.DEFAULT_CHARSET,
-                    tdMsgCsvDeserializationInfo.getDelimiter(),
+                    inLongMsgCsvDeserializationInfo.getDelimiter(),
                     null,
                     null,
                     null,
-                    tdMsgCsvDeserializationInfo.isDeleteHeadDelimiter(),
+                    inLongMsgCsvDeserializationInfo.isDeleteHeadDelimiter(),
                     TableFormatConstants.DEFAULT_IGNORE_ERRORS);
-            deserializer = new TDMsgDeserializer(tdMsgCsvFormatDeserializer);
+            deserializer = new InLongMsgDeserializer(inLongMsgCsvFormatDeserializer);
         } else {
             // TODO, support more formats here
             throw new UnsupportedOperationException(
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyInLongMsgMixedDeserializer.java
similarity index 54%
rename from inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializer.java
rename to inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyInLongMsgMixedDeserializer.java
index 080c4f7..c572a7d 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializer.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyInLongMsgMixedDeserializer.java
@@ -17,8 +17,8 @@
 
 package org.apache.inlong.sort.flink.deserialization;
 
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.nio.charset.StandardCharsets;
@@ -27,32 +27,32 @@ import java.util.Map;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.flink.util.Collector;
 import org.apache.inlong.sort.flink.Record;
-import org.apache.inlong.sort.flink.TDMsgMixedSerializedRecord;
+import org.apache.inlong.sort.flink.InLongMsgMixedSerializedRecord;
 import org.apache.inlong.sort.formats.common.RowFormatInfo;
-import org.apache.inlong.sort.formats.tdmsg.AbstractTDMsgMixedFormatDeserializer;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgMixedFormatConverter;
-import org.apache.inlong.sort.formats.tdmsgcsv.TDMsgCsvMixedFormatConverter;
-import org.apache.inlong.sort.formats.tdmsgcsv.TDMsgCsvMixedFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverter;
+import org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvMixedFormatConverter;
+import org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvMixedFormatDeserializer;
 import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
 import org.apache.inlong.sort.protocol.DataFlowInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
-import org.apache.inlong.sort.protocol.deserialization.TDMsgCsvDeserializationInfo;
-import org.apache.inlong.sort.protocol.deserialization.TDMsgDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.InLongMsgCsvDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.InLongMsgDeserializationInfo;
 import org.apache.inlong.sort.protocol.source.PulsarSourceInfo;
 import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
 import org.apache.inlong.sort.util.CommonUtils;
 
 /**
- * A deserializer to handle mixed TDMsg records.
+ * A deserializer to handle mixed InLongMsg records.
  */
-public class MultiTenancyTDMsgMixedDeserializer implements DataFlowInfoListener,
-        Deserializer<TDMsgMixedSerializedRecord, Record> {
+public class MultiTenancyInLongMsgMixedDeserializer implements DataFlowInfoListener,
+        Deserializer<InLongMsgMixedSerializedRecord, Record> {
 
     /**
      * Maps topic to mixed deserializer.
      */
-    private final Map<String, TDMsgMixedDeserializer> mixedDeserializerMap = new HashMap<>();
+    private final Map<String, InLongMsgMixedDeserializer> mixedDeserializerMap = new HashMap<>();
 
     @Override
     public void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
@@ -61,16 +61,17 @@ public class MultiTenancyTDMsgMixedDeserializer implements DataFlowInfoListener,
 
     @Override
     public void updateDataFlow(DataFlowInfo dataFlowInfo) {
-        if (!isTDMsgDataFlow(dataFlowInfo)) {
+        if (!isInLongMsgDataFlow(dataFlowInfo)) {
             return;
         }
-        final TDMsgDeserializationInfo tdMsgDeserializationInfo =
-                (TDMsgDeserializationInfo) dataFlowInfo.getSourceInfo().getDeserializationInfo();
+        final InLongMsgDeserializationInfo inLongMsgDeserializationInfo =
+                (InLongMsgDeserializationInfo) dataFlowInfo.getSourceInfo().getDeserializationInfo();
 
-        Pair<AbstractTDMsgMixedFormatDeserializer, TDMsgMixedFormatConverter> allDeserializer = generateDeserializer(
-                dataFlowInfo.getSourceInfo().getFields(), tdMsgDeserializationInfo);
-        final AbstractTDMsgMixedFormatDeserializer preDeserializer = allDeserializer.getLeft();
-        final TDMsgMixedFormatConverter deserializer = allDeserializer.getRight();
+        Pair<AbstractInLongMsgMixedFormatDeserializer, InLongMsgMixedFormatConverter>
+                allDeserializer = generateDeserializer(
+                dataFlowInfo.getSourceInfo().getFields(), inLongMsgDeserializationInfo);
+        final AbstractInLongMsgMixedFormatDeserializer preDeserializer = allDeserializer.getLeft();
+        final InLongMsgMixedFormatConverter deserializer = allDeserializer.getRight();
 
         final String topic;
         if (dataFlowInfo.getSourceInfo() instanceof TubeSourceInfo) {
@@ -81,23 +82,23 @@ public class MultiTenancyTDMsgMixedDeserializer implements DataFlowInfoListener,
             throw new UnsupportedOperationException("Unknown source type " + dataFlowInfo.getSourceInfo());
         }
 
-        final TDMsgMixedDeserializer mixedDeserializer = mixedDeserializerMap
-                 .computeIfAbsent(topic, key -> new TDMsgMixedDeserializer());
+        final InLongMsgMixedDeserializer mixedDeserializer = mixedDeserializerMap
+                 .computeIfAbsent(topic, key -> new InLongMsgMixedDeserializer());
         mixedDeserializer.updateDataFlow(
-                dataFlowInfo.getId(), tdMsgDeserializationInfo.getTid(), preDeserializer, deserializer);
+                dataFlowInfo.getId(), inLongMsgDeserializationInfo.getTid(), preDeserializer, deserializer);
     }
 
     @Override
     public void removeDataFlow(DataFlowInfo dataFlowInfo) {
-        if (!isTDMsgDataFlow(dataFlowInfo)) {
+        if (!isInLongMsgDataFlow(dataFlowInfo)) {
             return;
         }
         final TubeSourceInfo tubeSourceInfo = (TubeSourceInfo) dataFlowInfo.getSourceInfo();
-        final TDMsgDeserializationInfo tdMsgDeserializationInfo = (TDMsgDeserializationInfo) tubeSourceInfo
+        final InLongMsgDeserializationInfo inLongMsgDeserializationInfo = (InLongMsgDeserializationInfo) tubeSourceInfo
                 .getDeserializationInfo();
-        final TDMsgMixedDeserializer mixedDeserializer = mixedDeserializerMap.get(tubeSourceInfo.getTopic());
+        final InLongMsgMixedDeserializer mixedDeserializer = mixedDeserializerMap.get(tubeSourceInfo.getTopic());
         if (mixedDeserializer != null) {
-            mixedDeserializer.removeDataFlow(dataFlowInfo.getId(), tdMsgDeserializationInfo.getTid());
+            mixedDeserializer.removeDataFlow(dataFlowInfo.getId(), inLongMsgDeserializationInfo.getTid());
             if (mixedDeserializer.isEmpty()) {
                 mixedDeserializerMap.remove(tubeSourceInfo.getTopic());
             }
@@ -105,14 +106,14 @@ public class MultiTenancyTDMsgMixedDeserializer implements DataFlowInfoListener,
     }
 
     @VisibleForTesting
-    static boolean isTDMsgDataFlow(DataFlowInfo dataFlowInfo) {
+    static boolean isInLongMsgDataFlow(DataFlowInfo dataFlowInfo) {
         final DeserializationInfo deserializationInfo = dataFlowInfo.getSourceInfo().getDeserializationInfo();
-        return deserializationInfo instanceof TDMsgDeserializationInfo;
+        return deserializationInfo instanceof InLongMsgDeserializationInfo;
     }
 
-    public void deserialize(TDMsgMixedSerializedRecord record, Collector<Record> collector) throws Exception {
+    public void deserialize(InLongMsgMixedSerializedRecord record, Collector<Record> collector) throws Exception {
         final String topic = record.getTopic();
-        final TDMsgMixedDeserializer mixedDeserializer = mixedDeserializerMap.get(topic);
+        final InLongMsgMixedDeserializer mixedDeserializer = mixedDeserializerMap.get(topic);
         if (mixedDeserializer == null) {
             throw new Exception("No schema found for topic:" + topic);
         }
@@ -120,26 +121,26 @@ public class MultiTenancyTDMsgMixedDeserializer implements DataFlowInfoListener,
     }
 
     @VisibleForTesting
-    Pair<AbstractTDMsgMixedFormatDeserializer, TDMsgMixedFormatConverter> generateDeserializer(
+    Pair<AbstractInLongMsgMixedFormatDeserializer, InLongMsgMixedFormatConverter> generateDeserializer(
             FieldInfo[] fields,
-            TDMsgDeserializationInfo tdMsgDeserializationInfo) {
+            InLongMsgDeserializationInfo inLongMsgDeserializationInfo) {
 
         final RowFormatInfo rowFormatInfo =
                 CommonUtils.generateRowFormatInfo(fields);
 
-        final AbstractTDMsgMixedFormatDeserializer preDeserializer;
-        final TDMsgMixedFormatConverter deserializer;
-        if (tdMsgDeserializationInfo instanceof TDMsgCsvDeserializationInfo) {
-            final TDMsgCsvDeserializationInfo csvDeserializationInfo =
-                    (TDMsgCsvDeserializationInfo) tdMsgDeserializationInfo;
-            preDeserializer = new TDMsgCsvMixedFormatDeserializer(
+        final AbstractInLongMsgMixedFormatDeserializer preDeserializer;
+        final InLongMsgMixedFormatConverter deserializer;
+        if (inLongMsgDeserializationInfo instanceof InLongMsgCsvDeserializationInfo) {
+            final InLongMsgCsvDeserializationInfo csvDeserializationInfo =
+                    (InLongMsgCsvDeserializationInfo) inLongMsgDeserializationInfo;
+            preDeserializer = new InLongMsgCsvMixedFormatDeserializer(
                     StandardCharsets.UTF_8.name(),
                     csvDeserializationInfo.getDelimiter(),
                     null,
                     null,
                     csvDeserializationInfo.isDeleteHeadDelimiter(),
                     false);
-            deserializer = new TDMsgCsvMixedFormatConverter(
+            deserializer = new InLongMsgCsvMixedFormatConverter(
                     rowFormatInfo,
                     DEFAULT_TIME_FIELD_NAME,
                     DEFAULT_ATTRIBUTES_FIELD_NAME,
@@ -147,7 +148,7 @@ public class MultiTenancyTDMsgMixedDeserializer implements DataFlowInfoListener,
                     false);
         } else {
             throw new UnsupportedOperationException(
-                    "Not supported yet " + tdMsgDeserializationInfo.getClass().getSimpleName());
+                    "Not supported yet " + inLongMsgDeserializationInfo.getClass().getSimpleName());
         }
 
         return Pair.of(preDeserializer, deserializer);
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java
index 20cdd26..31956fa 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java
@@ -39,7 +39,7 @@ import org.apache.flink.util.TimeUtils;
 import org.apache.inlong.sort.configuration.Configuration;
 import org.apache.inlong.sort.configuration.Constants;
 import org.apache.inlong.sort.flink.SerializedRecord;
-import org.apache.inlong.sort.flink.TDMsgMixedSerializedRecord;
+import org.apache.inlong.sort.flink.InLongMsgMixedSerializedRecord;
 import org.apache.inlong.sort.meta.MetaManager;
 import org.apache.inlong.sort.util.CommonUtils;
 import org.apache.inlong.tubemq.client.config.ConsumerConfig;
@@ -386,7 +386,7 @@ public class MultiTenancyTubeConsumer {
                 synchronized (context.getCheckpointLock()) {
                     for (Message message : consumeResult.getMessageList()) {
                         // TODO, optimize for single tid or no tid topic
-                        context.collect(new TDMsgMixedSerializedRecord(
+                        context.collect(new InLongMsgMixedSerializedRecord(
                                 topic, System.currentTimeMillis(), message.getData()));
                     }
                     final String partitionKey = consumeResult.getPartitionKey();
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/TubeSubscriptionDescription.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/TubeSubscriptionDescription.java
index df37077..8634ffc 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/TubeSubscriptionDescription.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/TubeSubscriptionDescription.java
@@ -20,7 +20,7 @@ package org.apache.inlong.sort.flink.tubemq;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
-import org.apache.inlong.sort.protocol.deserialization.TDMsgDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.InLongMsgDeserializationInfo;
 import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
 import java.io.Serializable;
 import java.util.HashMap;
@@ -47,7 +47,7 @@ public class TubeSubscriptionDescription implements Serializable {
     private String consumerGroup;
 
     /**
-     * Each topic might include multiple data flow if it's packed by TDMsg.
+     * Each topic might include multiple data flow if it's packed by InLongMsg.
      */
     private final Map<Long, TubeDataFlowDescription> dataFlows = new HashMap<>();
 
@@ -194,8 +194,8 @@ public class TubeSubscriptionDescription implements Serializable {
 
         public static TubeDataFlowDescription generate(long dataFlowId, TubeSourceInfo tubeSourceInfo) {
             String tid = null;
-            if (tubeSourceInfo.getDeserializationInfo() instanceof TDMsgDeserializationInfo) {
-                tid = ((TDMsgDeserializationInfo) tubeSourceInfo.getDeserializationInfo()).getTid();
+            if (tubeSourceInfo.getDeserializationInfo() instanceof InLongMsgDeserializationInfo) {
+                tid = ((InLongMsgDeserializationInfo) tubeSourceInfo.getDeserializationInfo()).getTid();
             }
             return new TubeDataFlowDescription(dataFlowId, tid);
         }
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/util/CommonUtils.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/util/CommonUtils.java
index 2edb554..41e0954 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/util/CommonUtils.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/util/CommonUtils.java
@@ -32,7 +32,7 @@ import org.apache.inlong.sort.formats.common.RowFormatInfo;
 import org.apache.inlong.sort.protocol.DataFlowInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
-import org.apache.inlong.sort.protocol.deserialization.TDMsgDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.InLongMsgDeserializationInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -102,8 +102,8 @@ public class CommonUtils {
 
             // Get stream id
             final DeserializationInfo deserializationInfo = dataFlowInfo.getSourceInfo().getDeserializationInfo();
-            if (deserializationInfo instanceof TDMsgDeserializationInfo) {
-                streamId = ((TDMsgDeserializationInfo) deserializationInfo).getTid();
+            if (deserializationInfo instanceof InLongMsgDeserializationInfo) {
+                streamId = ((InLongMsgDeserializationInfo) deserializationInfo).getTid();
             }
 
         }
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializerTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/InLongMsgMixedDeserializerTest.java
similarity index 75%
rename from inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializerTest.java
rename to inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/InLongMsgMixedDeserializerTest.java
index 7ed9c9b..e573499 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializerTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/InLongMsgMixedDeserializerTest.java
@@ -27,25 +27,26 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Collector;
 import org.apache.inlong.sort.flink.Record;
-import org.apache.inlong.sort.flink.TDMsgMixedSerializedRecord;
-import org.apache.inlong.sort.formats.tdmsg.AbstractTDMsgFormatDeserializer;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgBody;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgHead;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgMixedFormatConverter;
+import org.apache.inlong.sort.flink.InLongMsgMixedSerializedRecord;
+import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverter;
 import org.apache.inlong.sort.util.TestLogger;
 import org.apache.inlong.sort.util.TestingUtils.TestingCollector;
 import org.junit.Before;
 import org.junit.Test;
 
 /**
- * Unit test for {@link TDMsgMixedDeserializer}.
+ * Unit test for {@link InLongMsgMixedDeserializer}.
  */
-public class TDMsgMixedDeserializerTest extends TestLogger {
+public class InLongMsgMixedDeserializerTest extends TestLogger {
 
     private final long dataFlowId1 = 1L;
     private final long dataFlowId2 = 2L;
     private final String tid = "tid";
-    private final TestingAbstractTDMsgFormatDeserializer preDeserializer = new TestingAbstractTDMsgFormatDeserializer();
+    private final TestingAbstractInLongMsgFormatDeserializer
+            preDeserializer = new TestingAbstractInLongMsgFormatDeserializer();
     private final TestingConverter deserializer = new TestingConverter();
     private final TestingCollector<Record> collector = new TestingCollector<>();
 
@@ -57,10 +58,11 @@ public class TDMsgMixedDeserializerTest extends TestLogger {
 
     @Test
     public void testUpdateAndRemoveDataFlow() {
-        final TDMsgMixedDeserializer mixedDeserializer = new TDMsgMixedDeserializer();
+        final InLongMsgMixedDeserializer mixedDeserializer = new InLongMsgMixedDeserializer();
         assertTrue(mixedDeserializer.isEmpty());
 
-        mixedDeserializer.updateDataFlow(dataFlowId1, tid, new TestingAbstractTDMsgFormatDeserializer(), deserializer);
+        mixedDeserializer.updateDataFlow(dataFlowId1, tid,
+                new TestingAbstractInLongMsgFormatDeserializer(), deserializer);
         mixedDeserializer.updateDataFlow(dataFlowId2, tid, preDeserializer, deserializer);
         assertFalse(mixedDeserializer.isEmpty());
 
@@ -83,7 +85,7 @@ public class TDMsgMixedDeserializerTest extends TestLogger {
 
     @Test
     public void testDeserialize() throws Exception {
-        final TDMsgMixedDeserializer mixedDeserializer = new TDMsgMixedDeserializer();
+        final InLongMsgMixedDeserializer mixedDeserializer = new InLongMsgMixedDeserializer();
         mixedDeserializer.updateDataFlow(dataFlowId1, tid, preDeserializer, deserializer);
         mixedDeserializer.updateDataFlow(dataFlowId2, tid, preDeserializer, deserializer);
 
@@ -93,7 +95,7 @@ public class TDMsgMixedDeserializerTest extends TestLogger {
         row.setField(2, tid);
         preDeserializer.records.add(row);
 
-        mixedDeserializer.deserialize(new TDMsgMixedSerializedRecord(), collector);
+        mixedDeserializer.deserialize(new InLongMsgMixedSerializedRecord(), collector);
         assertEquals(2, collector.results.size());
         assertEquals(dataFlowId1, collector.results.get(0).getDataflowId());
         assertEquals(tid, collector.results.get(0).getRow().getField(2));
@@ -101,13 +103,13 @@ public class TDMsgMixedDeserializerTest extends TestLogger {
         assertEquals(tid, collector.results.get(1).getRow().getField(2));
     }
 
-    private static class TestingAbstractTDMsgFormatDeserializer extends AbstractTDMsgFormatDeserializer {
+    private static class TestingAbstractInLongMsgFormatDeserializer extends AbstractInLongMsgFormatDeserializer {
 
         private static final long serialVersionUID = -5356395595688009428L;
 
         public final Queue<Row> records = new ArrayDeque<>();
 
-        public TestingAbstractTDMsgFormatDeserializer() {
+        public TestingAbstractInLongMsgFormatDeserializer() {
             super(false);
         }
 
@@ -117,17 +119,17 @@ public class TDMsgMixedDeserializerTest extends TestLogger {
         }
 
         @Override
-        protected TDMsgHead parseHead(String s) throws Exception {
+        protected InLongMsgHead parseHead(String s) throws Exception {
             return null;
         }
 
         @Override
-        protected TDMsgBody parseBody(byte[] bytes) throws Exception {
+        protected InLongMsgBody parseBody(byte[] bytes) throws Exception {
             return null;
         }
 
         @Override
-        protected Row convertRow(TDMsgHead tdMsgHead, TDMsgBody tdMsgBody) throws Exception {
+        protected Row convertRow(InLongMsgHead inLongMsgHead, InLongMsgBody inLongMsgBody) throws Exception {
             return null;
         }
 
@@ -137,7 +139,7 @@ public class TDMsgMixedDeserializerTest extends TestLogger {
         }
     }
 
-    private static class TestingConverter implements TDMsgMixedFormatConverter {
+    private static class TestingConverter implements InLongMsgMixedFormatConverter {
 
         private static final long serialVersionUID = -2239130770704933846L;
 
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyInLongMsgMixedDeserializerTest.java
similarity index 71%
rename from inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java
rename to inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyInLongMsgMixedDeserializerTest.java
index fddb22c..1f65a9e 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyInLongMsgMixedDeserializerTest.java
@@ -25,15 +25,15 @@ import java.sql.Timestamp;
 import java.text.SimpleDateFormat;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.inlong.commons.msg.TDMsg1;
+import org.apache.inlong.commons.msg.InLongMsg;
 import org.apache.inlong.sort.flink.Record;
-import org.apache.inlong.sort.flink.TDMsgMixedSerializedRecord;
+import org.apache.inlong.sort.flink.InLongMsgMixedSerializedRecord;
 import org.apache.inlong.sort.formats.common.LongFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgUtils;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
 import org.apache.inlong.sort.protocol.DataFlowInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.deserialization.TDMsgCsvDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.InLongMsgCsvDeserializationInfo;
 import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
 import org.apache.inlong.sort.util.TestLogger;
 import org.apache.inlong.sort.util.TestingUtils.EmptySinkInfo;
@@ -42,30 +42,30 @@ import org.apache.inlong.sort.util.TestingUtils.TestingCollector;
 import org.junit.Test;
 
 /**
- * Unit test for {@link MultiTenancyTDMsgMixedDeserializer}.
+ * Unit test for {@link MultiTenancyInLongMsgMixedDeserializer}.
  */
-public class MultiTenancyTDMsgMixedDeserializerTest extends TestLogger {
+public class MultiTenancyInLongMsgMixedDeserializerTest extends TestLogger {
 
     @Test
-    public void testIsTDMsgDataFlow() {
+    public void testIsInLongMsgDataFlow() {
         final TubeSourceInfo tubeSourceInfo = new TubeSourceInfo(
                 "topic",
                 "address",
                 null,
-                new TDMsgCsvDeserializationInfo("tid", ',', false),
+                new InLongMsgCsvDeserializationInfo("tid", ',', false),
                 new FieldInfo[0]);
         final EmptySinkInfo sinkInfo = new EmptySinkInfo();
         final DataFlowInfo dataFlowInfo = new DataFlowInfo(1L, tubeSourceInfo, sinkInfo);
 
-        assertTrue(MultiTenancyTDMsgMixedDeserializer.isTDMsgDataFlow(dataFlowInfo));
+        assertTrue(MultiTenancyInLongMsgMixedDeserializer.isInLongMsgDataFlow(dataFlowInfo));
 
-        final DataFlowInfo nonTDMsgDataFlow = new DataFlowInfo(2L, new EmptySourceInfo(), sinkInfo);
-        assertFalse(MultiTenancyTDMsgMixedDeserializer.isTDMsgDataFlow(nonTDMsgDataFlow));
+        final DataFlowInfo nonInLongMsgDataFlow = new DataFlowInfo(2L, new EmptySourceInfo(), sinkInfo);
+        assertFalse(MultiTenancyInLongMsgMixedDeserializer.isInLongMsgDataFlow(nonInLongMsgDataFlow));
     }
 
     @Test
     public void testDeserialize() throws Exception {
-        final MultiTenancyTDMsgMixedDeserializer deserializer = new MultiTenancyTDMsgMixedDeserializer();
+        final MultiTenancyInLongMsgMixedDeserializer deserializer = new MultiTenancyInLongMsgMixedDeserializer();
         final FieldInfo stringField = new FieldInfo("not_important", new StringFormatInfo());
         final FieldInfo longField = new FieldInfo("id", new LongFormatInfo());
 
@@ -73,19 +73,19 @@ public class MultiTenancyTDMsgMixedDeserializerTest extends TestLogger {
                 "topic",
                 "address",
                 null,
-                new TDMsgCsvDeserializationInfo("tid", '|', false),
+                new InLongMsgCsvDeserializationInfo("tid", '|', false),
                 new FieldInfo[]{stringField, longField});
         final EmptySinkInfo sinkInfo = new EmptySinkInfo();
         final DataFlowInfo dataFlowInfo = new DataFlowInfo(1L, tubeSourceInfo, sinkInfo);
         deserializer.addDataFlow(dataFlowInfo);
 
-        final TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
-        final String attrs = "m=0&" + TDMsgUtils.TDMSG_ATTR_STREAM_ID + "=tid&t=20210513";
+        final InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        final String attrs = "m=0&" + InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID + "=tid&t=20210513";
         final String body1 = "tianqiwan|29";
-        tdMsg1.addMsg(attrs, body1.getBytes());
+        inLongMsg.addMsg(attrs, body1.getBytes());
 
         final TestingCollector<Record> collector = new TestingCollector<>();
-        deserializer.deserialize(new TDMsgMixedSerializedRecord("topic", 0, tdMsg1.buildArray()), collector);
+        deserializer.deserialize(new InLongMsgMixedSerializedRecord("topic", 0, inLongMsg.buildArray()), collector);
 
         assertEquals(1, collector.results.size());
         assertEquals(1L, collector.results.get(0).getDataflowId());
@@ -94,7 +94,7 @@ public class MultiTenancyTDMsgMixedDeserializerTest extends TestLogger {
         assertEquals(new Timestamp(time), collector.results.get(0).getRow().getField(0));
         final Map<String, String> attributes = new HashMap<>();
         attributes.put("m", "0");
-        attributes.put(TDMsgUtils.TDMSG_ATTR_STREAM_ID, "tid");
+        attributes.put(InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID, "tid");
         attributes.put("t", "20210513");
         assertEquals(attributes, collector.results.get(0).getRow().getField(1));
         assertEquals("tianqiwan", collector.results.get(0).getRow().getField(2));
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/tubemq/MultiTopicTubeSourceFunctionTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/tubemq/MultiTopicTubeSourceFunctionTest.java
index 2821c2a..dc19a2d 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/tubemq/MultiTopicTubeSourceFunctionTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/tubemq/MultiTopicTubeSourceFunctionTest.java
@@ -27,7 +27,7 @@ import org.apache.inlong.sort.configuration.Constants;
 import org.apache.inlong.sort.flink.tubemq.MultiTopicTubeSourceFunction.SourceEvent;
 import org.apache.inlong.sort.flink.tubemq.MultiTopicTubeSourceFunction.SourceEventType;
 import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.deserialization.TDMsgCsvDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.InLongMsgCsvDeserializationInfo;
 import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
 import org.apache.inlong.sort.util.TestLogger;
 import java.util.ArrayList;
@@ -69,7 +69,7 @@ public class MultiTopicTubeSourceFunctionTest extends TestLogger {
                 topic,
                 masterAddress,
                 null,
-                new TDMsgCsvDeserializationInfo(tid, ',', true),
+                new InLongMsgCsvDeserializationInfo(tid, ',', true),
                 new FieldInfo[0]);
     }
 
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/tubemq/TubeSubscriptionDescriptionTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/tubemq/TubeSubscriptionDescriptionTest.java
index 32644b9..09f6a2c 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/tubemq/TubeSubscriptionDescriptionTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/tubemq/TubeSubscriptionDescriptionTest.java
@@ -22,7 +22,7 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.fail;
 
 import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.deserialization.TDMsgCsvDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.InLongMsgCsvDeserializationInfo;
 import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
 import org.apache.inlong.sort.util.TestLogger;
 import java.util.ArrayList;
@@ -50,7 +50,7 @@ public class TubeSubscriptionDescriptionTest extends TestLogger {
                 topic,
                 masterAddress,
                 null,
-                new TDMsgCsvDeserializationInfo(tid, ',', true),
+                new InLongMsgCsvDeserializationInfo(tid, ',', true),
                 new FieldInfo[0]);
         description.addDataFlow(dataFlowId, tubeSourceInfo);
 
@@ -68,7 +68,7 @@ public class TubeSubscriptionDescriptionTest extends TestLogger {
                 topic,
                 masterAddress,
                 null,
-                new TDMsgCsvDeserializationInfo(tid, ',', true),
+                new InLongMsgCsvDeserializationInfo(tid, ',', true),
                 new FieldInfo[0]);
         description.addDataFlow(dataFlowId, tubeSourceInfo1);
         final String tid2 = "10002";
@@ -77,7 +77,7 @@ public class TubeSubscriptionDescriptionTest extends TestLogger {
                 topic,
                 masterAddress,
                 consumerGroup,
-                new TDMsgCsvDeserializationInfo(tid2, ',', true),
+                new InLongMsgCsvDeserializationInfo(tid2, ',', true),
                 new FieldInfo[0]);
         final long dataFlowId2 = 10086L;
         description.addDataFlow(dataFlowId2, tubeSourceInfo2);
diff --git a/inlong-sort/sort-formats/format-tdmsg-base/pom.xml b/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
similarity index 97%
rename from inlong-sort/sort-formats/format-tdmsg-base/pom.xml
rename to inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
index 35447a9..d68f6b4 100644
--- a/inlong-sort/sort-formats/format-tdmsg-base/pom.xml
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
@@ -31,8 +31,8 @@
         <relativePath>..</relativePath>
     </parent>
 
-    <artifactId>sort-format-tdmsg-base</artifactId>
-    <name>Apache InLong - Sort Format-tdmsg-base</name>
+    <artifactId>sort-format-inlongmsg-base</artifactId>
+    <name>Apache InLong - Sort Format-inlongmsg-base</name>
     <packaging>jar</packaging>
 
     <dependencies>
diff --git a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/AbstractTDMsgFormatDeserializer.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
similarity index 73%
rename from inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/AbstractTDMsgFormatDeserializer.java
rename to inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
index 32d0d87..fddcd88 100644
--- a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/AbstractTDMsgFormatDeserializer.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.tdmsg;
+package org.apache.inlong.sort.formats.inlongmsg;
 
 import java.util.Arrays;
 import java.util.Iterator;
@@ -24,19 +24,19 @@ import java.util.Objects;
 import javax.annotation.Nonnull;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Collector;
-import org.apache.inlong.commons.msg.TDMsg1;
+import org.apache.inlong.commons.msg.InLongMsg;
 import org.apache.inlong.sort.formats.base.TableFormatDeserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * The base for all tdmsg format deserializers.
+ * The base for all inlongmsg format deserializers.
  */
-public abstract class AbstractTDMsgFormatDeserializer implements TableFormatDeserializer {
+public abstract class AbstractInLongMsgFormatDeserializer implements TableFormatDeserializer {
 
     private static final long serialVersionUID = 1L;
 
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractTDMsgFormatDeserializer.class);
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractInLongMsgFormatDeserializer.class);
 
     /**
      * True if ignore errors in the deserialization.
@@ -44,39 +44,39 @@ public abstract class AbstractTDMsgFormatDeserializer implements TableFormatDese
     @Nonnull
     protected final Boolean ignoreErrors;
 
-    public AbstractTDMsgFormatDeserializer(@Nonnull Boolean ignoreErrors) {
+    public AbstractInLongMsgFormatDeserializer(@Nonnull Boolean ignoreErrors) {
         this.ignoreErrors = ignoreErrors;
     }
 
     /**
-     * Parses the head of the tdmsg record.
+     * Parses the head of the inlongmsg record.
      */
-    protected abstract TDMsgHead parseHead(String attr) throws Exception;
+    protected abstract InLongMsgHead parseHead(String attr) throws Exception;
 
     /**
-     * Parses the body of the tdmsg record.
+     * Parses the body of the inlongmsg record.
      */
-    protected abstract TDMsgBody parseBody(byte[] bytes) throws Exception;
+    protected abstract InLongMsgBody parseBody(byte[] bytes) throws Exception;
 
     /**
-     * Converts the tdmsg record into a row.
+     * Converts the inlongmsg record into a row.
      */
-    protected abstract Row convertRow(TDMsgHead head, TDMsgBody body) throws Exception;
+    protected abstract Row convertRow(InLongMsgHead head, InLongMsgBody body) throws Exception;
 
     @Override
     public void flatMap(
             byte[] bytes,
             Collector<Row> collector
     ) throws Exception {
-        TDMsg1 tdMsg = TDMsg1.parseFrom(bytes);
+        InLongMsg inLongMsg = InLongMsg.parseFrom(bytes);
 
-        for (String attr : tdMsg.getAttrs()) {
-            Iterator<byte[]> iterator = tdMsg.getIterator(attr);
+        for (String attr : inLongMsg.getAttrs()) {
+            Iterator<byte[]> iterator = inLongMsg.getIterator(attr);
             if (iterator == null) {
                 continue;
             }
 
-            TDMsgHead head;
+            InLongMsgHead head;
             try {
                 head = parseHead(attr);
             } catch (Exception e) {
@@ -95,7 +95,7 @@ public abstract class AbstractTDMsgFormatDeserializer implements TableFormatDese
                     continue;
                 }
 
-                TDMsgBody body;
+                InLongMsgBody body;
                 try {
                     body = parseBody(bodyBytes);
                 } catch (Exception e) {
@@ -113,7 +113,7 @@ public abstract class AbstractTDMsgFormatDeserializer implements TableFormatDese
                     row = convertRow(head, body);
                 } catch (Exception e) {
                     if (ignoreErrors) {
-                        LOG.warn("Cannot properly convert the tdmsg ({}, {}) " + "to row.", head, body, e);
+                        LOG.warn("Cannot properly convert the inlongmsg ({}, {}) " + "to row.", head, body, e);
                         continue;
                     } else {
                         throw e;
@@ -137,7 +137,7 @@ public abstract class AbstractTDMsgFormatDeserializer implements TableFormatDese
             return false;
         }
 
-        AbstractTDMsgFormatDeserializer that = (AbstractTDMsgFormatDeserializer) o;
+        AbstractInLongMsgFormatDeserializer that = (AbstractInLongMsgFormatDeserializer) o;
         return ignoreErrors.equals(that.ignoreErrors);
     }
 
diff --git a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/AbstractTDMsgMixedFormatDeserializer.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatDeserializer.java
similarity index 74%
rename from inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/AbstractTDMsgMixedFormatDeserializer.java
rename to inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatDeserializer.java
index 3329cc6..95d0a3a 100644
--- a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/AbstractTDMsgMixedFormatDeserializer.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatDeserializer.java
@@ -16,17 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.tdmsg;
+package org.apache.inlong.sort.formats.inlongmsg;
 
 import javax.annotation.Nonnull;
 
 /**
- * The base for all tdmsg mixed format deserializers.
+ * The base for all inlongmsg mixed format deserializers.
  */
-public abstract class AbstractTDMsgMixedFormatDeserializer
-        extends AbstractTDMsgFormatDeserializer {
+public abstract class AbstractInLongMsgMixedFormatDeserializer
+        extends AbstractInLongMsgFormatDeserializer {
 
-    public AbstractTDMsgMixedFormatDeserializer(@Nonnull Boolean ignoreErrors) {
+    public AbstractInLongMsgMixedFormatDeserializer(@Nonnull Boolean ignoreErrors) {
         super(ignoreErrors);
     }
 }
diff --git a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgBody.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgBody.java
similarity index 84%
rename from inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgBody.java
rename to inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgBody.java
index 3c204f1..bb00c35 100644
--- a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgBody.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgBody.java
@@ -16,18 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.tdmsg;
+package org.apache.inlong.sort.formats.inlongmsg;
 
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import org.apache.inlong.commons.msg.TDMsg1;
+import org.apache.inlong.commons.msg.InLongMsg;
 
 /**
- * The body deserialized from {@link TDMsg1}.
+ * The body deserialized from {@link InLongMsg}.
  */
-public class TDMsgBody implements Serializable {
+public class InLongMsgBody implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
@@ -51,7 +51,7 @@ public class TDMsgBody implements Serializable {
      */
     private final Map<String, String> entries;
 
-    public TDMsgBody(
+    public InLongMsgBody(
             byte[] data,
             String tid,
             List<String> fields,
@@ -89,8 +89,8 @@ public class TDMsgBody implements Serializable {
             return false;
         }
 
-        TDMsgBody tdMsgBody = (TDMsgBody) o;
-        return Arrays.equals(data, tdMsgBody.data);
+        InLongMsgBody inLongMsgBody = (InLongMsgBody) o;
+        return Arrays.equals(data, inLongMsgBody.data);
     }
 
     @Override
@@ -100,7 +100,7 @@ public class TDMsgBody implements Serializable {
 
     @Override
     public String toString() {
-        return "TDMsgBody{" + "data=" + Arrays.toString(data) + ", tid='" + tid + '\''
+        return "InLongMsgBody{" + "data=" + Arrays.toString(data) + ", tid='" + tid + '\''
                + ", fields=" + fields + ", entries=" + entries + '}';
     }
 }
diff --git a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgHead.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgHead.java
similarity index 90%
rename from inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgHead.java
rename to inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgHead.java
index 662c8a0..a753457 100644
--- a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgHead.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgHead.java
@@ -16,19 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.tdmsg;
+package org.apache.inlong.sort.formats.inlongmsg;
 
 import java.io.Serializable;
 import java.sql.Timestamp;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import org.apache.inlong.commons.msg.TDMsg1;
+
+import org.apache.inlong.commons.msg.InLongMsg;
 
 /**
- * The head deserialized from {@link TDMsg1}.
+ * The head deserialized from {@link InLongMsg}.
  */
-public class TDMsgHead implements Serializable {
+public class InLongMsgHead implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
@@ -52,7 +53,7 @@ public class TDMsgHead implements Serializable {
      */
     private final List<String> predefinedFields;
 
-    public TDMsgHead(
+    public InLongMsgHead(
             Map<String, String> attributes,
             String tid,
             Timestamp time,
@@ -90,7 +91,7 @@ public class TDMsgHead implements Serializable {
             return false;
         }
 
-        TDMsgHead that = (TDMsgHead) o;
+        InLongMsgHead that = (InLongMsgHead) o;
         return Objects.equals(attributes, that.attributes)
                        && Objects.equals(tid, that.tid)
                        && Objects.equals(time, that.time)
@@ -104,7 +105,7 @@ public class TDMsgHead implements Serializable {
 
     @Override
     public String toString() {
-        return "TDMsgHead{"
+        return "InLongMsgHead{"
                        + "attributes=" + attributes
                        + ", tid='" + tid + '\''
                        + ", time=" + time
diff --git a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgMixedFormatConverter.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverter.java
similarity index 87%
rename from inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgMixedFormatConverter.java
rename to inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverter.java
index 55d3b16..25f6843 100644
--- a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgMixedFormatConverter.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverter.java
@@ -16,16 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.tdmsg;
+package org.apache.inlong.sort.formats.inlongmsg;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.types.Row;
 
 /**
- * The converter for a mixed tdmsg format.
+ * The converter for a mixed inlongmsg format.
  */
-public interface TDMsgMixedFormatConverter
+public interface InLongMsgMixedFormatConverter
         extends FlatMapFunction<Row, Row>, ResultTypeQueryable<Row> {
 
 }
diff --git a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgMixedFormatFactory.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatFactory.java
similarity index 79%
rename from inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgMixedFormatFactory.java
rename to inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatFactory.java
index 1f07eb9..48ed703 100644
--- a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgMixedFormatFactory.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatFactory.java
@@ -16,25 +16,25 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.tdmsg;
+package org.apache.inlong.sort.formats.inlongmsg;
 
 import java.util.Map;
 import org.apache.inlong.sort.formats.base.TableFormatDeserializer;
 
 /**
- * Factory for creating configured instances of {@link TDMsgMixedFormatConverter}.
+ * Factory for creating configured instances of {@link InLongMsgMixedFormatConverter}.
  */
-public interface TDMsgMixedFormatFactory {
+public interface InLongMsgMixedFormatFactory {
 
     /**
-     * Creates and configures a {@link TDMsgMixedFormatConverter} using the given
+     * Creates and configures a {@link InLongMsgMixedFormatConverter} using the given
      * properties.
      *
      * @param properties The normalized properties describing the format.
      * @return The configured serialization schema or null if the factory cannot
      *         provide an instance of the class.
      */
-    TDMsgMixedFormatConverter createMixedFormatConverter(
+    InLongMsgMixedFormatConverter createMixedFormatConverter(
             final Map<String, String> properties
     );
 
@@ -46,7 +46,7 @@ public interface TDMsgMixedFormatFactory {
      * @return The configured serialization schema or null if the factory cannot
      *         provide an instance of the class.
      */
-    AbstractTDMsgMixedFormatDeserializer createMixedFormatDeserializer(
+    AbstractInLongMsgMixedFormatDeserializer createMixedFormatDeserializer(
             final Map<String, String> properties
     );
 }
diff --git a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgMixedValidator.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedValidator.java
similarity index 91%
rename from inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgMixedValidator.java
rename to inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedValidator.java
index e26d2eb..1e9514c 100644
--- a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgMixedValidator.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedValidator.java
@@ -16,16 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.tdmsg;
+package org.apache.inlong.sort.formats.inlongmsg;
 
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.descriptors.FormatDescriptorValidator;
 import org.apache.inlong.sort.formats.base.TableFormatConstants;
 
 /**
- * Validator for mixed tdmsg formats.
+ * Validator for mixed inlongmsg formats.
  */
-public class TDMsgMixedValidator extends FormatDescriptorValidator {
+public class InLongMsgMixedValidator extends FormatDescriptorValidator {
 
     @Override
     public void validate(DescriptorProperties properties) {
diff --git a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgUtils.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
similarity index 90%
rename from inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgUtils.java
rename to inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
index 33d715f..e2b8e8f 100644
--- a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgUtils.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.tdmsg;
+package org.apache.inlong.sort.formats.inlongmsg;
 
 import static org.apache.flink.table.factories.TableFormatFactoryBase.deriveSchema;
 
@@ -36,7 +36,7 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
-import org.apache.inlong.commons.msg.TDMsg1;
+import org.apache.inlong.commons.msg.InLongMsg;
 import org.apache.inlong.sort.formats.base.TableFormatConstants;
 import org.apache.inlong.sort.formats.base.TableFormatUtils;
 import org.apache.inlong.sort.formats.common.FormatInfo;
@@ -44,24 +44,24 @@ import org.apache.inlong.sort.formats.common.RowFormatInfo;
 import org.apache.inlong.sort.formats.util.StringUtils;
 
 /**
- * A utility class for parsing TDMsg {@link TDMsg1}.
+ * A utility class for parsing InLongMsg {@link InLongMsg}.
  */
-public class TDMsgUtils {
+public class InLongMsgUtils {
 
-    public static final char TDMSG_ATTR_ENTRY_DELIMITER = '&';
-    public static final char TDMSG_ATTR_KV_DELIMITER = '=';
+    public static final char INLONGMSG_ATTR_ENTRY_DELIMITER = '&';
+    public static final char INLONGMSG_ATTR_KV_DELIMITER = '=';
 
     // keys in attributes
-    public static final String TDMSG_ATTR_STREAM_ID = "streamId";
-    public static final String TDMSG_ATTR_TIME_T = "t";
-    public static final String TDMSG_ATTR_TIME_DT = "dt";
-    public static final String TDMSG_ATTR_ADD_COLUMN_PREFIX = "__addcol";
+    public static final String INLONGMSG_ATTR_STREAM_ID = "streamId";
+    public static final String INLONGMSG_ATTR_TIME_T = "t";
+    public static final String INLONGMSG_ATTR_TIME_DT = "dt";
+    public static final String INLONGMSG_ATTR_ADD_COLUMN_PREFIX = "__addcol";
 
     public static final String FORMAT_TIME_FIELD_NAME = "format.time-field-name";
     public static final String FORMAT_ATTRIBUTES_FIELD_NAME = "format.attributes-field-name";
 
-    public static final String DEFAULT_TIME_FIELD_NAME = "tdmsg_time";
-    public static final String DEFAULT_ATTRIBUTES_FIELD_NAME = "tdmsg_attributes";
+    public static final String DEFAULT_TIME_FIELD_NAME = "inlongmsg_time";
+    public static final String DEFAULT_ATTRIBUTES_FIELD_NAME = "inlongmsg_attributes";
 
     public static final TypeInformation<Row> MIXED_ROW_TYPE =
             Types.ROW_NAMED(
@@ -111,8 +111,8 @@ public class TDMsgUtils {
     public static Map<String, String> parseAttr(String attr) {
         return StringUtils.splitKv(
                 attr,
-                TDMSG_ATTR_ENTRY_DELIMITER,
-                TDMSG_ATTR_KV_DELIMITER,
+                INLONGMSG_ATTR_ENTRY_DELIMITER,
+                INLONGMSG_ATTR_KV_DELIMITER,
                 null,
                 null
         );
@@ -154,15 +154,15 @@ public class TDMsgUtils {
     public static List<String> getPredefinedFields(Map<String, String> head) {
         Map<Integer, String> predefinedFields = new HashMap<>();
         for (String key : head.keySet()) {
-            if (!key.startsWith(TDMSG_ATTR_ADD_COLUMN_PREFIX)) {
+            if (!key.startsWith(INLONGMSG_ATTR_ADD_COLUMN_PREFIX)) {
                 continue;
             }
 
             int index =
                     Integer.parseInt(
                             key.substring(
-                                    TDMSG_ATTR_ADD_COLUMN_PREFIX.length(),
-                                    key.indexOf('_', TDMSG_ATTR_ADD_COLUMN_PREFIX.length())
+                                    INLONGMSG_ATTR_ADD_COLUMN_PREFIX.length(),
+                                    key.indexOf('_', INLONGMSG_ATTR_ADD_COLUMN_PREFIX.length())
                             )
                     );
 
@@ -179,8 +179,8 @@ public class TDMsgUtils {
     }
 
     public static Row buildMixedRow(
-            TDMsgHead head,
-            TDMsgBody body,
+            InLongMsgHead head,
+            InLongMsgBody body,
             String tid
     ) {
         Row row = new Row(7);
diff --git a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgValidator.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgValidator.java
similarity index 91%
rename from inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgValidator.java
rename to inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgValidator.java
index 486bf2f..c2a51ee 100644
--- a/inlong-sort/sort-formats/format-tdmsg-base/src/main/java/org/apache/inlong/sort/formats/tdmsg/TDMsgValidator.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgValidator.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.tdmsg;
+package org.apache.inlong.sort.formats.inlongmsg;
 
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.descriptors.FormatDescriptorValidator;
@@ -24,9 +24,9 @@ import org.apache.inlong.sort.formats.base.TableFormatConstants;
 import org.apache.inlong.sort.formats.base.TableFormatUtils;
 
 /**
- * Validator for mixed tdmsg formats.
+ * Validator for mixed inlongmsg formats.
  */
-public class TDMsgValidator extends FormatDescriptorValidator {
+public class InLongMsgValidator extends FormatDescriptorValidator {
 
     @Override
     public void validate(DescriptorProperties properties) {
diff --git a/inlong-sort/sort-formats/format-tdmsg-csv/pom.xml b/inlong-sort/sort-formats/format-inlongmsg-csv/pom.xml
similarity index 96%
rename from inlong-sort/sort-formats/format-tdmsg-csv/pom.xml
rename to inlong-sort/sort-formats/format-inlongmsg-csv/pom.xml
index 712a0b2..0f341ec 100644
--- a/inlong-sort/sort-formats/format-tdmsg-csv/pom.xml
+++ b/inlong-sort/sort-formats/format-inlongmsg-csv/pom.xml
@@ -31,8 +31,8 @@
         <relativePath>..</relativePath>
     </parent>
 
-    <artifactId>sort-format-tdmsg-csv</artifactId>
-    <name>Apache InLong - Sort Format-tdmsg-csv</name>
+    <artifactId>sort-format-inlongmsg-csv</artifactId>
+    <name>Apache InLong - Sort Format-inlongmsg-csv</name>
     <packaging>jar</packaging>
 
     <dependencies>
@@ -55,7 +55,7 @@
 
         <dependency>
             <groupId>org.apache.inlong</groupId>
-            <artifactId>sort-format-tdmsg-base</artifactId>
+            <artifactId>sort-format-inlongmsg-base</artifactId>
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
diff --git a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsv.java b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsv.java
similarity index 81%
rename from inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsv.java
rename to inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsv.java
index ee5685b..f39ec6b 100644
--- a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsv.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsv.java
@@ -16,12 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.tdmsgcsv;
+package org.apache.inlong.sort.formats.inlongmsgcsv;
 
 import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA;
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
 
 import java.nio.charset.Charset;
 import java.util.Map;
@@ -36,14 +36,14 @@ import org.apache.inlong.sort.formats.base.TableFormatConstants;
  * for Comma-Separated Values (CSV) Files") proposed by the Internet Engineering
  * Task Force (IETF).
  */
-public class TDMsgCsv extends FormatDescriptor {
+public class InLongMsgCsv extends FormatDescriptor {
 
-    public static final String FORMAT_TYPE_VALUE = "tdmsgcsv";
+    public static final String FORMAT_TYPE_VALUE = "inlongmsgcsv";
 
     private DescriptorProperties internalProperties =
             new DescriptorProperties(true);
 
-    public TDMsgCsv() {
+    public InLongMsgCsv() {
         super(FORMAT_TYPE_VALUE, 1);
     }
 
@@ -52,7 +52,7 @@ public class TDMsgCsv extends FormatDescriptor {
      *
      * @param delimiter the field delimiter character
      */
-    public TDMsgCsv delimiter(char delimiter) {
+    public InLongMsgCsv delimiter(char delimiter) {
         internalProperties.putCharacter(TableFormatConstants.FORMAT_DELIMITER, delimiter);
         return this;
     }
@@ -62,7 +62,7 @@ public class TDMsgCsv extends FormatDescriptor {
      *
      * @param escapeCharacter escaping character (e.g. backslash).
      */
-    public TDMsgCsv escapeCharacter(char escapeCharacter) {
+    public InLongMsgCsv escapeCharacter(char escapeCharacter) {
         internalProperties
                 .putCharacter(TableFormatConstants.FORMAT_ESCAPE_CHARACTER, escapeCharacter);
         return this;
@@ -73,7 +73,7 @@ public class TDMsgCsv extends FormatDescriptor {
      *
      * @param quoteCharacter quoting character (e.g. quotation).
      */
-    public TDMsgCsv quoteCharacter(char quoteCharacter) {
+    public InLongMsgCsv quoteCharacter(char quoteCharacter) {
         internalProperties.putCharacter(TableFormatConstants.FORMAT_QUOTE_CHARACTER, quoteCharacter);
         return this;
     }
@@ -84,7 +84,7 @@ public class TDMsgCsv extends FormatDescriptor {
      *
      * @param nullLiteral null literal (e.g. "null" or "n/a")
      */
-    public TDMsgCsv nullLiteral(String nullLiteral) {
+    public InLongMsgCsv nullLiteral(String nullLiteral) {
         checkNotNull(nullLiteral);
         internalProperties.putString(TableFormatConstants.FORMAT_NULL_LITERAL, nullLiteral);
         return this;
@@ -95,7 +95,7 @@ public class TDMsgCsv extends FormatDescriptor {
      *
      * @param charset The charset of the text.
      */
-    public TDMsgCsv charset(Charset charset) {
+    public InLongMsgCsv charset(Charset charset) {
         checkNotNull(charset);
         internalProperties.putString(TableFormatConstants.FORMAT_CHARSET, charset.name());
         return this;
@@ -104,15 +104,15 @@ public class TDMsgCsv extends FormatDescriptor {
     /**
      * Retains the delimiter at the first character.
      */
-    public TDMsgCsv retainHeadDelimiter() {
-        internalProperties.putBoolean(TDMsgCsvUtils.FORMAT_DELETE_HEAD_DELIMITER, false);
+    public InLongMsgCsv retainHeadDelimiter() {
+        internalProperties.putBoolean(InLongMsgCsvUtils.FORMAT_DELETE_HEAD_DELIMITER, false);
         return this;
     }
 
     /**
      * Ignores the errors in the serialization and deserialization.
      */
-    public TDMsgCsv ignoreErrors() {
+    public InLongMsgCsv ignoreErrors() {
         internalProperties.putBoolean(TableFormatConstants.FORMAT_IGNORE_ERRORS, true);
         return this;
     }
@@ -122,7 +122,7 @@ public class TDMsgCsv extends FormatDescriptor {
      *
      * @param schema format schema string.
      */
-    public TDMsgCsv schema(String schema) {
+    public InLongMsgCsv schema(String schema) {
         checkNotNull(schema);
         internalProperties.putString(TableFormatConstants.FORMAT_SCHEMA, schema);
         return this;
@@ -139,7 +139,7 @@ public class TDMsgCsv extends FormatDescriptor {
      * field. A "from" definition is interpreted as a field renaming in the
      * format.
      */
-    public TDMsgCsv deriveSchema() {
+    public InLongMsgCsv deriveSchema() {
         internalProperties.putBoolean(FORMAT_DERIVE_SCHEMA, true);
         return this;
     }
@@ -149,7 +149,7 @@ public class TDMsgCsv extends FormatDescriptor {
      *
      * @param timeFieldName The name of the time field.
      */
-    public TDMsgCsv timeFieldName(String timeFieldName) {
+    public InLongMsgCsv timeFieldName(String timeFieldName) {
         checkNotNull(timeFieldName);
         internalProperties.putString(FORMAT_TIME_FIELD_NAME, timeFieldName);
         return this;
@@ -160,7 +160,7 @@ public class TDMsgCsv extends FormatDescriptor {
      *
      * @param attributesFieldName The name of the attributes field.
      */
-    public TDMsgCsv attributesFieldName(String attributesFieldName) {
+    public InLongMsgCsv attributesFieldName(String attributesFieldName) {
         checkNotNull(attributesFieldName);
         internalProperties.putString(FORMAT_ATTRIBUTES_FIELD_NAME, attributesFieldName);
         return this;
diff --git a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatDeserializer.java b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
similarity index 79%
rename from inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatDeserializer.java
rename to inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
index bb5351e..ca2bab0 100644
--- a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatDeserializer.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
@@ -16,10 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.tdmsgcsv;
+package org.apache.inlong.sort.formats.inlongmsgcsv;
 
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
 
 import java.util.Objects;
 import javax.annotation.Nonnull;
@@ -28,15 +28,15 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.types.Row;
 import org.apache.inlong.sort.formats.base.TableFormatConstants;
 import org.apache.inlong.sort.formats.common.RowFormatInfo;
-import org.apache.inlong.sort.formats.tdmsg.AbstractTDMsgFormatDeserializer;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgBody;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgHead;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgUtils;
+import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
 
 /**
- * The deserializer for the records in TDMsgCsv format.
+ * The deserializer for the records in InLongMsgCsv format.
  */
-public final class TDMsgCsvFormatDeserializer extends AbstractTDMsgFormatDeserializer {
+public final class InLongMsgCsvFormatDeserializer extends AbstractInLongMsgFormatDeserializer {
 
     private static final long serialVersionUID = 1L;
 
@@ -93,7 +93,7 @@ public final class TDMsgCsvFormatDeserializer extends AbstractTDMsgFormatDeseria
      */
     private final boolean deleteHeadDelimiter;
 
-    public TDMsgCsvFormatDeserializer(
+    public InLongMsgCsvFormatDeserializer(
             @Nonnull RowFormatInfo rowFormatInfo,
             @Nonnull String timeFieldName,
             @Nonnull String attributesFieldName,
@@ -118,7 +118,7 @@ public final class TDMsgCsvFormatDeserializer extends AbstractTDMsgFormatDeseria
         this.deleteHeadDelimiter = deleteHeadDelimiter;
     }
 
-    public TDMsgCsvFormatDeserializer(
+    public InLongMsgCsvFormatDeserializer(
             @Nonnull RowFormatInfo rowFormatInfo
     ) {
         this(
@@ -130,24 +130,24 @@ public final class TDMsgCsvFormatDeserializer extends AbstractTDMsgFormatDeseria
                 null,
                 null,
                 null,
-                TDMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER,
+                InLongMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER,
                 TableFormatConstants.DEFAULT_IGNORE_ERRORS
         );
     }
 
     @Override
     public TypeInformation<Row> getProducedType() {
-        return TDMsgUtils.buildRowType(timeFieldName, attributesFieldName, rowFormatInfo);
+        return InLongMsgUtils.buildRowType(timeFieldName, attributesFieldName, rowFormatInfo);
     }
 
     @Override
-    protected TDMsgHead parseHead(String attr) {
-        return TDMsgCsvUtils.parseHead(attr);
+    protected InLongMsgHead parseHead(String attr) {
+        return InLongMsgCsvUtils.parseHead(attr);
     }
 
     @Override
-    protected TDMsgBody parseBody(byte[] bytes) {
-        return TDMsgCsvUtils.parseBody(
+    protected InLongMsgBody parseBody(byte[] bytes) {
+        return InLongMsgCsvUtils.parseBody(
                 bytes,
                 charset,
                 delimiter,
@@ -158,8 +158,8 @@ public final class TDMsgCsvFormatDeserializer extends AbstractTDMsgFormatDeseria
     }
 
     @Override
-    protected Row convertRow(TDMsgHead head, TDMsgBody body) {
-        return TDMsgCsvUtils.buildRow(
+    protected Row convertRow(InLongMsgHead head, InLongMsgBody body) {
+        return InLongMsgCsvUtils.buildRow(
                 rowFormatInfo,
                 nullLiteral,
                 head.getTime(),
@@ -183,7 +183,7 @@ public final class TDMsgCsvFormatDeserializer extends AbstractTDMsgFormatDeseria
             return false;
         }
 
-        TDMsgCsvFormatDeserializer that = (TDMsgCsvFormatDeserializer) o;
+        InLongMsgCsvFormatDeserializer that = (InLongMsgCsvFormatDeserializer) o;
         return deleteHeadDelimiter == that.deleteHeadDelimiter
                        && rowFormatInfo.equals(that.rowFormatInfo)
                        && timeFieldName.equals(that.timeFieldName)
diff --git a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatFactory.java b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactory.java
similarity index 79%
rename from inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatFactory.java
rename to inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactory.java
index 144d2af..ccbd9a6 100644
--- a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatFactory.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactory.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.tdmsgcsv;
+package org.apache.inlong.sort.formats.inlongmsgcsv;
 
 import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_CHARSET;
 import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
@@ -27,11 +27,11 @@ import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ES
 import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_IGNORE_ERRORS;
 import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_NULL_LITERAL;
 import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_QUOTE_CHARACTER;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.FORMAT_TIME_FIELD_NAME;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.getDataFormatInfo;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.validateFieldNames;
-import static org.apache.inlong.sort.formats.tdmsgcsv.TDMsgCsvUtils.FORMAT_DELETE_HEAD_DELIMITER;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getDataFormatInfo;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.validateFieldNames;
+import static org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvUtils.FORMAT_DELETE_HEAD_DELIMITER;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -43,21 +43,21 @@ import org.apache.inlong.sort.formats.base.TableFormatConstants;
 import org.apache.inlong.sort.formats.base.TableFormatDeserializer;
 import org.apache.inlong.sort.formats.base.TableFormatDeserializerFactory;
 import org.apache.inlong.sort.formats.common.RowFormatInfo;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgMixedFormatFactory;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgMixedValidator;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgUtils;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgValidator;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatFactory;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedValidator;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgValidator;
 
 /**
- * Table format factory for providing configured instances of TDMsgCsv-to-row
+ * Table format factory for providing configured instances of InLongMsgCsv-to-row
  * serializer and deserializer.
  */
-public final class TDMsgCsvFormatFactory
+public final class InLongMsgCsvFormatFactory
         extends TableFormatFactoryBase<Row>
-        implements TableFormatDeserializerFactory, TDMsgMixedFormatFactory {
+        implements TableFormatDeserializerFactory, InLongMsgMixedFormatFactory {
 
-    public TDMsgCsvFormatFactory() {
-        super(TDMsgCsv.FORMAT_TYPE_VALUE, 1, true);
+    public InLongMsgCsvFormatFactory() {
+        super(InLongMsgCsv.FORMAT_TYPE_VALUE, 1, true);
     }
 
     @Override
@@ -84,7 +84,7 @@ public final class TDMsgCsvFormatFactory
                 new DescriptorProperties(true);
         descriptorProperties.putProperties(properties);
 
-        final TDMsgValidator validator = new TDMsgValidator();
+        final InLongMsgValidator validator = new InLongMsgValidator();
         validator.validate(descriptorProperties);
 
         RowFormatInfo rowFormatInfo = getDataFormatInfo(descriptorProperties);
@@ -92,11 +92,11 @@ public final class TDMsgCsvFormatFactory
         String timeFieldName =
                 descriptorProperties
                         .getOptionalString(FORMAT_TIME_FIELD_NAME)
-                        .orElse(TDMsgUtils.DEFAULT_TIME_FIELD_NAME);
+                        .orElse(InLongMsgUtils.DEFAULT_TIME_FIELD_NAME);
         String attributesFieldName =
                 descriptorProperties
                         .getOptionalString(FORMAT_ATTRIBUTES_FIELD_NAME)
-                        .orElse(TDMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME);
+                        .orElse(InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME);
 
         validateFieldNames(timeFieldName, attributesFieldName, rowFormatInfo);
 
@@ -123,13 +123,13 @@ public final class TDMsgCsvFormatFactory
         Boolean deleteHeadDelimiter =
                 descriptorProperties
                         .getOptionalBoolean(FORMAT_DELETE_HEAD_DELIMITER)
-                        .orElse(TDMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER);
+                        .orElse(InLongMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER);
         boolean ignoreErrors =
                 descriptorProperties
                         .getOptionalBoolean(FORMAT_IGNORE_ERRORS)
                         .orElse(DEFAULT_IGNORE_ERRORS);
 
-        return new TDMsgCsvFormatDeserializer(
+        return new InLongMsgCsvFormatDeserializer(
                 rowFormatInfo,
                 timeFieldName,
                 attributesFieldName,
@@ -144,14 +144,14 @@ public final class TDMsgCsvFormatFactory
     }
 
     @Override
-    public TDMsgCsvMixedFormatDeserializer createMixedFormatDeserializer(
+    public InLongMsgCsvMixedFormatDeserializer createMixedFormatDeserializer(
             Map<String, String> properties
     ) {
         final DescriptorProperties descriptorProperties =
                 new DescriptorProperties(true);
         descriptorProperties.putProperties(properties);
 
-        final TDMsgMixedValidator validator = new TDMsgMixedValidator();
+        final InLongMsgMixedValidator validator = new InLongMsgMixedValidator();
         validator.validate(descriptorProperties);
 
         String charset =
@@ -173,13 +173,13 @@ public final class TDMsgCsvFormatFactory
         Boolean deleteHeadDelimiter =
                 descriptorProperties
                         .getOptionalBoolean(FORMAT_DELETE_HEAD_DELIMITER)
-                        .orElse(TDMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER);
+                        .orElse(InLongMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER);
         boolean ignoreErrors =
                 descriptorProperties
                         .getOptionalBoolean(FORMAT_IGNORE_ERRORS)
                         .orElse(DEFAULT_IGNORE_ERRORS);
 
-        return new TDMsgCsvMixedFormatDeserializer(
+        return new InLongMsgCsvMixedFormatDeserializer(
                 charset,
                 delimiter,
                 escapeCharacter,
@@ -190,7 +190,7 @@ public final class TDMsgCsvFormatFactory
     }
 
     @Override
-    public TDMsgCsvMixedFormatConverter createMixedFormatConverter(
+    public InLongMsgCsvMixedFormatConverter createMixedFormatConverter(
             Map<String, String> properties
     ) {
         final DescriptorProperties descriptorProperties =
@@ -202,11 +202,11 @@ public final class TDMsgCsvFormatFactory
         String timeFieldName =
                 descriptorProperties
                         .getOptionalString(FORMAT_TIME_FIELD_NAME)
-                        .orElse(TDMsgUtils.DEFAULT_TIME_FIELD_NAME);
+                        .orElse(InLongMsgUtils.DEFAULT_TIME_FIELD_NAME);
         String attributesFieldName =
                 descriptorProperties
                         .getOptionalString(FORMAT_ATTRIBUTES_FIELD_NAME)
-                        .orElse(TDMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME);
+                        .orElse(InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME);
 
         validateFieldNames(timeFieldName, attributesFieldName, rowFormatInfo);
 
@@ -219,7 +219,7 @@ public final class TDMsgCsvFormatFactory
                         .getOptionalBoolean(FORMAT_IGNORE_ERRORS)
                         .orElse(DEFAULT_IGNORE_ERRORS);
 
-        return new TDMsgCsvMixedFormatConverter(
+        return new InLongMsgCsvMixedFormatConverter(
                 rowFormatInfo,
                 timeFieldName,
                 attributesFieldName,
diff --git a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvMixedFormatConverter.java b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
similarity index 75%
rename from inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvMixedFormatConverter.java
rename to inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
index d0e5e2e..6bc995c 100644
--- a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvMixedFormatConverter.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.tdmsgcsv;
+package org.apache.inlong.sort.formats.inlongmsgcsv;
 
 import java.sql.Timestamp;
 import java.util.List;
@@ -27,19 +27,19 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Collector;
 import org.apache.inlong.sort.formats.common.RowFormatInfo;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgMixedFormatConverter;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgUtils;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverter;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Converter used to deserialize a mixed row in tdmsg-csv format.
+ * Converter used to deserialize a mixed row in inlongmsg-csv format.
  */
-public class TDMsgCsvMixedFormatConverter implements TDMsgMixedFormatConverter {
+public class InLongMsgCsvMixedFormatConverter implements InLongMsgMixedFormatConverter {
 
     private static final long serialVersionUID = 1L;
 
-    private static final Logger LOG = LoggerFactory.getLogger(TDMsgCsvMixedFormatConverter.class);
+    private static final Logger LOG = LoggerFactory.getLogger(InLongMsgCsvMixedFormatConverter.class);
 
     /**
      * The schema of the rows.
@@ -69,7 +69,7 @@ public class TDMsgCsvMixedFormatConverter implements TDMsgMixedFormatConverter {
      */
     private final boolean ignoreErrors;
 
-    public TDMsgCsvMixedFormatConverter(
+    public InLongMsgCsvMixedFormatConverter(
             @Nonnull RowFormatInfo rowFormatInfo,
             @Nonnull String timeFieldName,
             @Nonnull String attributesFieldName,
@@ -85,7 +85,7 @@ public class TDMsgCsvMixedFormatConverter implements TDMsgMixedFormatConverter {
 
     @Override
     public TypeInformation<Row> getProducedType() {
-        return TDMsgUtils.buildRowType(timeFieldName, attributesFieldName, rowFormatInfo);
+        return InLongMsgUtils.buildRowType(timeFieldName, attributesFieldName, rowFormatInfo);
     }
 
     @Override
@@ -94,12 +94,12 @@ public class TDMsgCsvMixedFormatConverter implements TDMsgMixedFormatConverter {
         Row row;
 
         try {
-            Timestamp time = TDMsgUtils.getTimeFromMixedRow(mixedRow);
-            Map<String, String> attributes = TDMsgUtils.getAttributesFromMixedRow(mixedRow);
-            List<String> predefinedFields = TDMsgUtils.getPredefinedFieldsFromMixedRow(mixedRow);
-            List<String> fields = TDMsgUtils.getFieldsFromMixedRow(mixedRow);
+            Timestamp time = InLongMsgUtils.getTimeFromMixedRow(mixedRow);
+            Map<String, String> attributes = InLongMsgUtils.getAttributesFromMixedRow(mixedRow);
+            List<String> predefinedFields = InLongMsgUtils.getPredefinedFieldsFromMixedRow(mixedRow);
+            List<String> fields = InLongMsgUtils.getFieldsFromMixedRow(mixedRow);
 
-            row = TDMsgCsvUtils.buildRow(rowFormatInfo, nullLiteral, time, attributes,
+            row = InLongMsgCsvUtils.buildRow(rowFormatInfo, nullLiteral, time, attributes,
                     predefinedFields, fields);
         } catch (Exception e) {
             if (ignoreErrors) {
@@ -124,7 +124,7 @@ public class TDMsgCsvMixedFormatConverter implements TDMsgMixedFormatConverter {
             return false;
         }
 
-        TDMsgCsvMixedFormatConverter that = (TDMsgCsvMixedFormatConverter) o;
+        InLongMsgCsvMixedFormatConverter that = (InLongMsgCsvMixedFormatConverter) o;
         return ignoreErrors == that.ignoreErrors
                        && rowFormatInfo.equals(that.rowFormatInfo)
                        && timeFieldName.equals(that.timeFieldName)
diff --git a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvMixedFormatDeserializer.java b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java
similarity index 75%
rename from inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvMixedFormatDeserializer.java
rename to inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java
index 2352fde..3d20422 100644
--- a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvMixedFormatDeserializer.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.tdmsgcsv;
+package org.apache.inlong.sort.formats.inlongmsgcsv;
 
 import java.util.Objects;
 import javax.annotation.Nonnull;
@@ -24,15 +24,15 @@ import javax.annotation.Nullable;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.types.Row;
 import org.apache.inlong.sort.formats.base.TableFormatConstants;
-import org.apache.inlong.sort.formats.tdmsg.AbstractTDMsgMixedFormatDeserializer;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgBody;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgHead;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgUtils;
+import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
 
 /**
- * The deserializer for the records in TDMsgCsv format.
+ * The deserializer for the records in InLongMsgCsv format.
  */
-public final class TDMsgCsvMixedFormatDeserializer extends AbstractTDMsgMixedFormatDeserializer {
+public final class InLongMsgCsvMixedFormatDeserializer extends AbstractInLongMsgMixedFormatDeserializer {
 
     private static final long serialVersionUID = 1L;
 
@@ -65,7 +65,7 @@ public final class TDMsgCsvMixedFormatDeserializer extends AbstractTDMsgMixedFor
      */
     private final boolean deleteHeadDelimiter;
 
-    public TDMsgCsvMixedFormatDeserializer(
+    public InLongMsgCsvMixedFormatDeserializer(
             @Nonnull String charset,
             @Nonnull Character delimiter,
             @Nullable Character escapeChar,
@@ -82,30 +82,30 @@ public final class TDMsgCsvMixedFormatDeserializer extends AbstractTDMsgMixedFor
         this.deleteHeadDelimiter = deleteHeadDelimiter;
     }
 
-    public TDMsgCsvMixedFormatDeserializer() {
+    public InLongMsgCsvMixedFormatDeserializer() {
         this(
                 TableFormatConstants.DEFAULT_CHARSET,
                 TableFormatConstants.DEFAULT_DELIMITER,
                 null,
                 null,
-                TDMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER,
+                InLongMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER,
                 TableFormatConstants.DEFAULT_IGNORE_ERRORS
         );
     }
 
     @Override
     public TypeInformation<Row> getProducedType() {
-        return TDMsgUtils.MIXED_ROW_TYPE;
+        return InLongMsgUtils.MIXED_ROW_TYPE;
     }
 
     @Override
-    protected TDMsgHead parseHead(String attr) {
-        return TDMsgCsvUtils.parseHead(attr);
+    protected InLongMsgHead parseHead(String attr) {
+        return InLongMsgCsvUtils.parseHead(attr);
     }
 
     @Override
-    protected TDMsgBody parseBody(byte[] bytes) {
-        return TDMsgCsvUtils.parseBody(
+    protected InLongMsgBody parseBody(byte[] bytes) {
+        return InLongMsgCsvUtils.parseBody(
                 bytes,
                 charset,
                 delimiter,
@@ -116,8 +116,8 @@ public final class TDMsgCsvMixedFormatDeserializer extends AbstractTDMsgMixedFor
     }
 
     @Override
-    protected Row convertRow(TDMsgHead head, TDMsgBody body) {
-        return TDMsgUtils.buildMixedRow(head, body, head.getTid());
+    protected Row convertRow(InLongMsgHead head, InLongMsgBody body) {
+        return InLongMsgUtils.buildMixedRow(head, body, head.getTid());
     }
 
     @Override
@@ -134,7 +134,7 @@ public final class TDMsgCsvMixedFormatDeserializer extends AbstractTDMsgMixedFor
             return false;
         }
 
-        TDMsgCsvMixedFormatDeserializer that = (TDMsgCsvMixedFormatDeserializer) o;
+        InLongMsgCsvMixedFormatDeserializer that = (InLongMsgCsvMixedFormatDeserializer) o;
         return deleteHeadDelimiter == that.deleteHeadDelimiter
                        && charset.equals(that.charset)
                        && delimiter.equals(that.delimiter)
diff --git a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvUtils.java b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
similarity index 73%
rename from inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvUtils.java
rename to inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
index f935e6d..84c32bb 100644
--- a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvUtils.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
@@ -16,15 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.tdmsgcsv;
+package org.apache.inlong.sort.formats.inlongmsgcsv;
 
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.TDMSG_ATTR_STREAM_ID;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.TDMSG_ATTR_TIME_DT;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.TDMSG_ATTR_TIME_T;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.getPredefinedFields;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.parseAttr;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.parseDateTime;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.parseEpochTime;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseAttr;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseDateTime;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseEpochTime;
 
 import java.nio.charset.Charset;
 import java.sql.Timestamp;
@@ -36,57 +36,57 @@ import org.apache.flink.types.Row;
 import org.apache.inlong.sort.formats.base.TableFormatUtils;
 import org.apache.inlong.sort.formats.common.FormatInfo;
 import org.apache.inlong.sort.formats.common.RowFormatInfo;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgBody;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgHead;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgUtils;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
 import org.apache.inlong.sort.formats.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Utilities for {@link TDMsgCsv}.
+ * Utilities for {@link InLongMsgCsv}.
  */
-public class TDMsgCsvUtils {
+public class InLongMsgCsvUtils {
 
-    private static final Logger LOG = LoggerFactory.getLogger(TDMsgUtils.class);
+    private static final Logger LOG = LoggerFactory.getLogger(InLongMsgUtils.class);
 
     public static final String FORMAT_DELETE_HEAD_DELIMITER = "format.delete-head-delimiter";
     public static final boolean DEFAULT_DELETE_HEAD_DELIMITER = true;
 
-    public static TDMsgHead parseHead(String attr) {
+    public static InLongMsgHead parseHead(String attr) {
         Map<String, String> attributes = parseAttr(attr);
 
         // Extracts interface from the attributes.
         String streamId;
 
-        if (attributes.containsKey(TDMSG_ATTR_STREAM_ID)) {
-            streamId = attributes.get(TDMSG_ATTR_STREAM_ID);
+        if (attributes.containsKey(INLONGMSG_ATTR_STREAM_ID)) {
+            streamId = attributes.get(INLONGMSG_ATTR_STREAM_ID);
         } else {
-            throw new IllegalArgumentException("Could not find " + TDMSG_ATTR_STREAM_ID + " in attributes!");
+            throw new IllegalArgumentException("Could not find " + INLONGMSG_ATTR_STREAM_ID + " in attributes!");
         }
 
         // Extracts time from the attributes
         Timestamp time;
 
-        if (attributes.containsKey(TDMSG_ATTR_TIME_T)) {
-            String date = attributes.get(TDMSG_ATTR_TIME_T).trim();
+        if (attributes.containsKey(INLONGMSG_ATTR_TIME_T)) {
+            String date = attributes.get(INLONGMSG_ATTR_TIME_T).trim();
             time = parseDateTime(date);
-        } else if (attributes.containsKey(TDMSG_ATTR_TIME_DT)) {
-            String epoch = attributes.get(TDMSG_ATTR_TIME_DT).trim();
+        } else if (attributes.containsKey(INLONGMSG_ATTR_TIME_DT)) {
+            String epoch = attributes.get(INLONGMSG_ATTR_TIME_DT).trim();
             time = parseEpochTime(epoch);
         } else {
             throw new IllegalArgumentException(
-                    "Could not find " + TDMSG_ATTR_TIME_T
-                            + " or " + TDMSG_ATTR_TIME_DT + " in attributes!");
+                    "Could not find " + INLONGMSG_ATTR_TIME_T
+                            + " or " + INLONGMSG_ATTR_TIME_DT + " in attributes!");
         }
 
         // Extracts predefined fields from the attributes
         List<String> predefinedFields = getPredefinedFields(attributes);
 
-        return new TDMsgHead(attributes, streamId, time, predefinedFields);
+        return new InLongMsgHead(attributes, streamId, time, predefinedFields);
     }
 
-    public static TDMsgBody parseBody(
+    public static InLongMsgBody parseBody(
             byte[] bytes,
             String charset,
             char delimiter,
@@ -104,7 +104,7 @@ public class TDMsgCsvUtils {
 
         String[] fieldTexts = StringUtils.splitCsv(bodyText, delimiter, escapeChar, quoteChar);
 
-        return new TDMsgBody(
+        return new InLongMsgBody(
                 bytes,
                 null,
                 Arrays.asList(fieldTexts),
diff --git a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
similarity index 91%
rename from inlong-sort/sort-formats/format-tdmsg-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
rename to inlong-sort/sort-formats/format-inlongmsg-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index 3cd93f0..52a23d9 100644
--- a/inlong-sort/sort-formats/format-tdmsg-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.inlong.sort.formats.tdmsgcsv.TDMsgCsvFormatFactory
+org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvFormatFactory
diff --git a/inlong-sort/sort-formats/format-tdmsg-csv/src/test/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatDeserializerTest.java b/inlong-sort/sort-formats/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
similarity index 73%
rename from inlong-sort/sort-formats/format-tdmsg-csv/src/test/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatDeserializerTest.java
rename to inlong-sort/sort-formats/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
index 91026f7..9c93c14 100644
--- a/inlong-sort/sort-formats/format-tdmsg-csv/src/test/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatDeserializerTest.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
@@ -16,11 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.tdmsgcsv;
+package org.apache.inlong.sort.formats.inlongmsgcsv;
 
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.DEFAULT_TIME_FIELD_NAME;
-import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.TDMSG_ATTR_STREAM_ID;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID;
 import static org.junit.Assert.assertEquals;
 
 import java.nio.charset.Charset;
@@ -36,7 +36,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Collector;
-import org.apache.inlong.commons.msg.TDMsg1;
+import org.apache.inlong.commons.msg.InLongMsg;
 import org.apache.inlong.sort.formats.base.TableFormatConstants;
 import org.apache.inlong.sort.formats.common.FormatInfo;
 import org.apache.inlong.sort.formats.common.IntFormatInfo;
@@ -45,9 +45,9 @@ import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.junit.Test;
 
 /**
- * Unit tests for {@link TDMsgCsvFormatDeserializer}.
+ * Unit tests for {@link InLongMsgCsvFormatDeserializer}.
  */
-public class TDMsgCsvFormatDeserializerTest {
+public class InLongMsgCsvFormatDeserializerTest {
 
     private static final RowFormatInfo TEST_ROW_INFO =
             new RowFormatInfo(
@@ -64,8 +64,8 @@ public class TDMsgCsvFormatDeserializerTest {
 
     @Test
     public void testRowType() {
-        TDMsgCsvFormatDeserializer deserializer =
-                new TDMsgCsvFormatDeserializer(TEST_ROW_INFO);
+        InLongMsgCsvFormatDeserializer deserializer =
+                new InLongMsgCsvFormatDeserializer(TEST_ROW_INFO);
 
         TypeInformation<Row> expectedRowType =
                 Types.ROW_NAMED(
@@ -95,19 +95,19 @@ public class TDMsgCsvFormatDeserializerTest {
     @Test
     public void testNormal() throws Exception {
 
-        final TDMsgCsvFormatDeserializer deserializer =
-                new TDMsgCsvFormatDeserializer(TEST_ROW_INFO);
+        final InLongMsgCsvFormatDeserializer deserializer =
+                new InLongMsgCsvFormatDeserializer(TEST_ROW_INFO);
 
-        TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
-        String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        String attrs = "m=0&" + INLONGMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
         String body1 = "123,field11,field12,field13";
         String body2 = "123,field21,field22,field23";
-        tdMsg1.addMsg(attrs, body1.getBytes());
-        tdMsg1.addMsg(attrs, body2.getBytes());
+        inLongMsg.addMsg(attrs, body1.getBytes());
+        inLongMsg.addMsg(attrs, body2.getBytes());
 
         Map<String, String> expectedAttributes = new HashMap<>();
         expectedAttributes.put("m", "0");
-        expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
+        expectedAttributes.put(INLONGMSG_ATTR_STREAM_ID, "testInterfaceId");
         expectedAttributes.put("t", "20200322");
         expectedAttributes.put("__addcol1__", "1");
         expectedAttributes.put("__addcol2__", "2");
@@ -136,7 +136,7 @@ public class TDMsgCsvFormatDeserializerTest {
 
         testRowDeserialization(
                 deserializer,
-                tdMsg1.buildArray(),
+                inLongMsg.buildArray(),
                 Arrays.asList(expectedRow1, expectedRow2)
         );
     }
@@ -144,19 +144,19 @@ public class TDMsgCsvFormatDeserializerTest {
     @Test
     public void testEmptyField() throws Exception {
 
-        final TDMsgCsvFormatDeserializer deserializer =
-                new TDMsgCsvFormatDeserializer(TEST_ROW_INFO);
+        final InLongMsgCsvFormatDeserializer deserializer =
+                new InLongMsgCsvFormatDeserializer(TEST_ROW_INFO);
 
-        TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
-        String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        String attrs = "m=0&" + INLONGMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
         String body1 = "123,field11,field12,";
         String body2 = "123,field21,,field23";
-        tdMsg1.addMsg(attrs, body1.getBytes());
-        tdMsg1.addMsg(attrs, body2.getBytes());
+        inLongMsg.addMsg(attrs, body1.getBytes());
+        inLongMsg.addMsg(attrs, body2.getBytes());
 
         Map<String, String> expectedAttributes = new HashMap<>();
         expectedAttributes.put("m", "0");
-        expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
+        expectedAttributes.put(INLONGMSG_ATTR_STREAM_ID, "testInterfaceId");
         expectedAttributes.put("t", "20200322");
         expectedAttributes.put("__addcol1__", "1");
         expectedAttributes.put("__addcol2__", "2");
@@ -185,7 +185,7 @@ public class TDMsgCsvFormatDeserializerTest {
 
         testRowDeserialization(
                 deserializer,
-                tdMsg1.buildArray(),
+                inLongMsg.buildArray(),
                 Arrays.asList(expectedRow1, expectedRow2)
         );
     }
@@ -193,19 +193,19 @@ public class TDMsgCsvFormatDeserializerTest {
     @Test
     public void testNoPredefinedFields() throws Exception {
 
-        final TDMsgCsvFormatDeserializer deserializer =
-                new TDMsgCsvFormatDeserializer(TEST_ROW_INFO);
+        final InLongMsgCsvFormatDeserializer deserializer =
+                new InLongMsgCsvFormatDeserializer(TEST_ROW_INFO);
 
-        TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
-        String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322";
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        String attrs = "m=0&" + INLONGMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322";
         String body1 = "1,2,123,field11,field12,";
         String body2 = "1,2,123,field21,,field23";
-        tdMsg1.addMsg(attrs, body1.getBytes());
-        tdMsg1.addMsg(attrs, body2.getBytes());
+        inLongMsg.addMsg(attrs, body1.getBytes());
+        inLongMsg.addMsg(attrs, body2.getBytes());
 
         Map<String, String> expectedAttributes = new HashMap<>();
         expectedAttributes.put("m", "0");
-        expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
+        expectedAttributes.put(INLONGMSG_ATTR_STREAM_ID, "testInterfaceId");
         expectedAttributes.put("t", "20200322");
 
         Row expectedRow1 = Row.of(
@@ -232,15 +232,15 @@ public class TDMsgCsvFormatDeserializerTest {
 
         testRowDeserialization(
                 deserializer,
-                tdMsg1.buildArray(),
+                inLongMsg.buildArray(),
                 Arrays.asList(expectedRow1, expectedRow2)
         );
     }
 
     @Test
     public void testIgnoreAttributeErrors() throws Exception {
-        TDMsgCsvFormatDeserializer deserializer =
-                new TDMsgCsvFormatDeserializer(
+        InLongMsgCsvFormatDeserializer deserializer =
+                new InLongMsgCsvFormatDeserializer(
                         TEST_ROW_INFO,
                         DEFAULT_TIME_FIELD_NAME,
                         DEFAULT_ATTRIBUTES_FIELD_NAME,
@@ -253,22 +253,22 @@ public class TDMsgCsvFormatDeserializerTest {
                         true
                 );
 
-        TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
         String attrs = "m=0&&&&";
         String body1 = "123,field11,field12,field13";
-        tdMsg1.addMsg(attrs, body1.getBytes());
+        inLongMsg.addMsg(attrs, body1.getBytes());
 
         testRowDeserialization(
                 deserializer,
-                tdMsg1.buildArray(),
+                inLongMsg.buildArray(),
                 Collections.emptyList()
         );
     }
 
     @Test
     public void testIgnoreBodyErrors() throws Exception {
-        final TDMsgCsvFormatDeserializer deserializer =
-                new TDMsgCsvFormatDeserializer(
+        final InLongMsgCsvFormatDeserializer deserializer =
+                new InLongMsgCsvFormatDeserializer(
                         TEST_ROW_INFO,
                         DEFAULT_TIME_FIELD_NAME,
                         DEFAULT_ATTRIBUTES_FIELD_NAME,
@@ -281,16 +281,16 @@ public class TDMsgCsvFormatDeserializerTest {
                         true
                 );
 
-        TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
-        String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        String attrs = "m=0&" + INLONGMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
         String body1 = "aaa,field11,field12,field13";
         String body2 = "123,field21,field22,field23";
-        tdMsg1.addMsg(attrs, body1.getBytes());
-        tdMsg1.addMsg(attrs, body2.getBytes());
+        inLongMsg.addMsg(attrs, body1.getBytes());
+        inLongMsg.addMsg(attrs, body2.getBytes());
 
         Map<String, String> expectedAttributes = new HashMap<>();
         expectedAttributes.put("m", "0");
-        expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
+        expectedAttributes.put(INLONGMSG_ATTR_STREAM_ID, "testInterfaceId");
         expectedAttributes.put("t", "20200322");
         expectedAttributes.put("__addcol1__", "1");
         expectedAttributes.put("__addcol2__", "2");
@@ -308,15 +308,15 @@ public class TDMsgCsvFormatDeserializerTest {
 
         testRowDeserialization(
                 deserializer,
-                tdMsg1.buildArray(),
+                inLongMsg.buildArray(),
                 Collections.singletonList(expectedRow2)
         );
     }
 
     @Test
     public void testDeleteHeadDelimiter() throws Exception {
-        final TDMsgCsvFormatDeserializer deserializer =
-                new TDMsgCsvFormatDeserializer(
+        final InLongMsgCsvFormatDeserializer deserializer =
+                new InLongMsgCsvFormatDeserializer(
                         TEST_ROW_INFO,
                         DEFAULT_TIME_FIELD_NAME,
                         DEFAULT_ATTRIBUTES_FIELD_NAME,
@@ -329,15 +329,15 @@ public class TDMsgCsvFormatDeserializerTest {
                         true
                 );
 
-        TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
-        String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322";
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        String attrs = "m=0&" + INLONGMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322";
         String body = ",1,2,3,field1,field2,field3";
 
-        tdMsg1.addMsg(attrs, body.getBytes());
+        inLongMsg.addMsg(attrs, body.getBytes());
 
         Map<String, String> expectedAttributes = new HashMap<>();
         expectedAttributes.put("m", "0");
-        expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
+        expectedAttributes.put(INLONGMSG_ATTR_STREAM_ID, "testInterfaceId");
         expectedAttributes.put("t", "20200322");
 
         Row expectedRow = Row.of(
@@ -353,15 +353,15 @@ public class TDMsgCsvFormatDeserializerTest {
 
         testRowDeserialization(
                 deserializer,
-                tdMsg1.buildArray(),
+                inLongMsg.buildArray(),
                 Collections.singletonList(expectedRow)
         );
     }
 
     @Test
     public void testRetainHeadDelimiter() throws Exception {
-        final TDMsgCsvFormatDeserializer deserializer =
-                new TDMsgCsvFormatDeserializer(
+        final InLongMsgCsvFormatDeserializer deserializer =
+                new InLongMsgCsvFormatDeserializer(
                         TEST_ROW_INFO,
                         DEFAULT_TIME_FIELD_NAME,
                         DEFAULT_ATTRIBUTES_FIELD_NAME,
@@ -374,15 +374,15 @@ public class TDMsgCsvFormatDeserializerTest {
                         false
                 );
 
-        TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
-        String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322";
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        String attrs = "m=0&" + INLONGMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322";
         String body = ",1,2,field1,field2,field3";
 
-        tdMsg1.addMsg(attrs, body.getBytes());
+        inLongMsg.addMsg(attrs, body.getBytes());
 
         Map<String, String> expectedAttributes = new HashMap<>();
         expectedAttributes.put("m", "0");
-        expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
+        expectedAttributes.put(INLONGMSG_ATTR_STREAM_ID, "testInterfaceId");
         expectedAttributes.put("t", "20200322");
 
         Row expectedRow = Row.of(
@@ -398,26 +398,26 @@ public class TDMsgCsvFormatDeserializerTest {
 
         testRowDeserialization(
                 deserializer,
-                tdMsg1.buildArray(),
+                inLongMsg.buildArray(),
                 Collections.singletonList(expectedRow)
         );
     }
 
     @Test
     public void testUnmatchedFields1() throws Exception {
-        final TDMsgCsvFormatDeserializer deserializer =
-                new TDMsgCsvFormatDeserializer(TEST_ROW_INFO);
+        final InLongMsgCsvFormatDeserializer deserializer =
+                new InLongMsgCsvFormatDeserializer(TEST_ROW_INFO);
 
-        TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
-        String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        String attrs = "m=0&" + INLONGMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
         String body1 = "123,field11,field12";
         String body2 = "123,field21,field22,field23,field24";
-        tdMsg1.addMsg(attrs, body1.getBytes());
-        tdMsg1.addMsg(attrs, body2.getBytes());
+        inLongMsg.addMsg(attrs, body1.getBytes());
+        inLongMsg.addMsg(attrs, body2.getBytes());
 
         Map<String, String> expectedAttributes = new HashMap<>();
         expectedAttributes.put("m", "0");
-        expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
+        expectedAttributes.put(INLONGMSG_ATTR_STREAM_ID, "testInterfaceId");
         expectedAttributes.put("t", "20200322");
         expectedAttributes.put("__addcol1__", "1");
         expectedAttributes.put("__addcol2__", "2");
@@ -446,25 +446,25 @@ public class TDMsgCsvFormatDeserializerTest {
 
         testRowDeserialization(
                 deserializer,
-                tdMsg1.buildArray(),
+                inLongMsg.buildArray(),
                 Arrays.asList(expectedRow1, expectedRow2)
         );
     }
 
     @Test
     public void testUnmatchedFields2() throws Exception {
-        final TDMsgCsvFormatDeserializer deserializer =
-                new TDMsgCsvFormatDeserializer(TEST_ROW_INFO);
+        final InLongMsgCsvFormatDeserializer deserializer =
+                new InLongMsgCsvFormatDeserializer(TEST_ROW_INFO);
 
-        TDMsg1 tdMsg1 = TDMsg1.newTDMsg();
-        String attrs = "m=0&" + TDMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322&__addcol1__=1&"
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        String attrs = "m=0&" + INLONGMSG_ATTR_STREAM_ID + "=testInterfaceId&t=20200322&__addcol1__=1&"
                + "__addcol2__=2&__addcol3__=3&__addcol4__=4&__addcol5__=5&__addcol6__=6&__addcol7__=7";
         String body = "field11,field12";
-        tdMsg1.addMsg(attrs, body.getBytes());
+        inLongMsg.addMsg(attrs, body.getBytes());
 
         Map<String, String> expectedAttributes = new HashMap<>();
         expectedAttributes.put("m", "0");
-        expectedAttributes.put(TDMSG_ATTR_STREAM_ID, "testInterfaceId");
+        expectedAttributes.put(INLONGMSG_ATTR_STREAM_ID, "testInterfaceId");
         expectedAttributes.put("t", "20200322");
         expectedAttributes.put("__addcol1__", "1");
         expectedAttributes.put("__addcol2__", "2");
@@ -487,13 +487,13 @@ public class TDMsgCsvFormatDeserializerTest {
 
         testRowDeserialization(
                 deserializer,
-                tdMsg1.buildArray(),
+                inLongMsg.buildArray(),
                 Collections.singletonList(expectedRow)
         );
     }
 
     private void testRowDeserialization(
-            TDMsgCsvFormatDeserializer deserializer,
+            InLongMsgCsvFormatDeserializer deserializer,
             byte[] bytes,
             List<Row> expectedRows
     ) throws Exception {
diff --git a/inlong-sort/sort-formats/format-tdmsg-csv/src/test/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatFactoryTest.java b/inlong-sort/sort-formats/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactoryTest.java
similarity index 86%
rename from inlong-sort/sort-formats/format-tdmsg-csv/src/test/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatFactoryTest.java
rename to inlong-sort/sort-formats/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactoryTest.java
index 3c86df8..f3bff15 100644
--- a/inlong-sort/sort-formats/format-tdmsg-csv/src/test/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvFormatFactoryTest.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactoryTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.tdmsgcsv;
+package org.apache.inlong.sort.formats.inlongmsgcsv;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -37,13 +37,13 @@ import org.apache.inlong.sort.formats.common.FormatUtils;
 import org.apache.inlong.sort.formats.common.IntFormatInfo;
 import org.apache.inlong.sort.formats.common.RowFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgUtils;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
 import org.junit.Test;
 
 /**
- * Tests for {@link TDMsgCsvFormatFactory}.
+ * Tests for {@link InLongMsgCsvFormatFactory}.
  */
-public class TDMsgCsvFormatFactoryTest {
+public class InLongMsgCsvFormatFactoryTest {
 
     private static final TypeInformation<Row> SCHEMA =
             Types.ROW(
@@ -70,7 +70,7 @@ public class TDMsgCsvFormatFactoryTest {
     @Test
     public void testCreateTableFormatDeserializer() throws Exception {
         final Map<String, String> properties =
-                new TDMsgCsv()
+                new InLongMsgCsv()
                         .schema(FormatUtils.marshall(TEST_FORMAT_SCHEMA))
                         .delimiter(';')
                         .charset(StandardCharsets.ISO_8859_1)
@@ -80,11 +80,11 @@ public class TDMsgCsvFormatFactoryTest {
                         .toProperties();
         assertNotNull(properties);
 
-        final TDMsgCsvFormatDeserializer expectedDeser =
-                new TDMsgCsvFormatDeserializer(
+        final InLongMsgCsvFormatDeserializer expectedDeser =
+                new InLongMsgCsvFormatDeserializer(
                         TEST_FORMAT_SCHEMA,
-                        TDMsgUtils.DEFAULT_TIME_FIELD_NAME,
-                        TDMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME,
+                        InLongMsgUtils.DEFAULT_TIME_FIELD_NAME,
+                        InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME,
                         StandardCharsets.ISO_8859_1.name(),
                         ';',
                         '\\',
@@ -111,10 +111,10 @@ public class TDMsgCsvFormatFactoryTest {
                         .schema(TableSchema.fromTypeInfo(SCHEMA))
                         .toProperties()
         );
-        properties.putAll(new TDMsgCsv().deriveSchema().toProperties());
+        properties.putAll(new InLongMsgCsv().deriveSchema().toProperties());
 
-        final TDMsgCsvFormatDeserializer expectedDeser =
-                new TDMsgCsvFormatDeserializer(TEST_FORMAT_SCHEMA);
+        final InLongMsgCsvFormatDeserializer expectedDeser =
+                new InLongMsgCsvFormatDeserializer(TEST_FORMAT_SCHEMA);
 
         final TableFormatDeserializer actualDeser =
                 TableFormatUtils.getTableFormatDeserializer(
diff --git a/inlong-sort/sort-formats/format-tdmsg-csv/src/test/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvTest.java b/inlong-sort/sort-formats/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvTest.java
similarity index 91%
rename from inlong-sort/sort-formats/format-tdmsg-csv/src/test/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvTest.java
rename to inlong-sort/sort-formats/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvTest.java
index 9478430..831f9c9 100644
--- a/inlong-sort/sort-formats/format-tdmsg-csv/src/test/java/org/apache/inlong/sort/formats/tdmsgcsv/TDMsgCsvTest.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.tdmsgcsv;
+package org.apache.inlong.sort.formats.inlongmsgcsv;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
@@ -27,13 +27,13 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.descriptors.Descriptor;
 import org.apache.flink.table.descriptors.DescriptorTestBase;
 import org.apache.flink.table.descriptors.DescriptorValidator;
-import org.apache.inlong.sort.formats.tdmsg.TDMsgValidator;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgValidator;
 import org.junit.Test;
 
 /**
- * Tests for the {@link TDMsgCsv} descriptor.
+ * Tests for the {@link InLongMsgCsv} descriptor.
  */
-public class TDMsgCsvTest extends DescriptorTestBase {
+public class InLongMsgCsvTest extends DescriptorTestBase {
 
     private static final String TEST_SCHEMA =
             "{"
@@ -54,7 +54,7 @@ public class TDMsgCsvTest extends DescriptorTestBase {
                     + "}";
 
     private static final Descriptor CUSTOM_DESCRIPTOR_WITH_SCHEMA =
-            new TDMsgCsv()
+            new InLongMsgCsv()
                     .schema(TEST_SCHEMA)
                     .timeFieldName("time")
                     .attributesFieldName("attributes")
@@ -67,7 +67,7 @@ public class TDMsgCsvTest extends DescriptorTestBase {
                     .ignoreErrors();
 
     private static final Descriptor MINIMAL_DESCRIPTOR_WITH_DERIVED_SCHEMA =
-            new TDMsgCsv()
+            new InLongMsgCsv()
                     .deriveSchema();
 
     @Test(expected = ValidationException.class)
@@ -100,7 +100,7 @@ public class TDMsgCsvTest extends DescriptorTestBase {
     @Override
     public List<Map<String, String>> properties() {
         final Map<String, String> props1 = new HashMap<>();
-        props1.put("format.type", "tdmsgcsv");
+        props1.put("format.type", "inlongmsgcsv");
         props1.put("format.property-version", "1");
         props1.put("format.schema", TEST_SCHEMA);
         props1.put("format.time-field-name", "time");
@@ -114,7 +114,7 @@ public class TDMsgCsvTest extends DescriptorTestBase {
         props1.put("format.ignore-errors", "true");
 
         final Map<String, String> props2 = new HashMap<>();
-        props2.put("format.type", "tdmsgcsv");
+        props2.put("format.type", "inlongmsgcsv");
         props2.put("format.property-version", "1");
         props2.put("format.derive-schema", "true");
 
@@ -123,6 +123,6 @@ public class TDMsgCsvTest extends DescriptorTestBase {
 
     @Override
     public DescriptorValidator validator() {
-        return new TDMsgValidator();
+        return new InLongMsgValidator();
     }
 }
diff --git a/inlong-sort/sort-formats/format-tdmsg-csv/src/test/resources/log4j-test.properties b/inlong-sort/sort-formats/format-inlongmsg-csv/src/test/resources/log4j-test.properties
similarity index 100%
rename from inlong-sort/sort-formats/format-tdmsg-csv/src/test/resources/log4j-test.properties
rename to inlong-sort/sort-formats/format-inlongmsg-csv/src/test/resources/log4j-test.properties
diff --git a/inlong-sort/sort-formats/pom.xml b/inlong-sort/sort-formats/pom.xml
index 0349dd3..d6aad64 100644
--- a/inlong-sort/sort-formats/pom.xml
+++ b/inlong-sort/sort-formats/pom.xml
@@ -54,8 +54,8 @@
         <module>format-base</module>
         <module>format-csv</module>
         <module>format-kv</module>
-        <module>format-tdmsg-base</module>
-        <module>format-tdmsg-csv</module>
+        <module>format-inlongmsg-base</module>
+        <module>format-inlongmsg-csv</module>
     </modules>
 
     <properties>
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_consumer.cc b/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_consumer.cc
index 9049ab0..fa8841c 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_consumer.cc
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_consumer.cc
@@ -1,147 +1,148 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <stdio.h>
-#include <string.h>
-#include <unistd.h>
-#include <signal.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <time.h>
-#include <fcntl.h>
-#include <errno.h>
-#include <libgen.h>
-#include <sys/time.h>
-#include <chrono>
-#include <set>
-#include <string>
-#include <thread>
-#include "tubemq/tubemq_client.h"
-#include "tubemq/tubemq_config.h"
-#include "tubemq/tubemq_errcode.h"
-#include "tubemq/tubemq_message.h"
-#include "tubemq/tubemq_return.h"
-
-
-using namespace std;
-using namespace tubemq;
-
-using std::set;
-using std::string;
-using tubemq::ConsumerConfig;
-using tubemq::ConsumerResult;
-using tubemq::TubeMQConsumer;
-
-
-
-
-
-
-int main(int argc, char* argv[]) {
-  bool result;
-  string err_info;
-  if (argc < 4) {
-    printf("\n must ./comd master_addr group_name topic_name [config_file_path]");
-    return -1;
-  }
-  // set parameters
-  string master_addr = argv[1];
-  string group_name = argv[2];
-  string topic_name = argv[3];
-  string conf_file = "../conf/client.conf";
-  if (argc > 4) {
-    conf_file = argv[4];
-  }
-
-  TubeMQConsumer consumer_1;
-  set<string> topic_list;
-  topic_list.insert(topic_name);
-  ConsumerConfig consumer_config;
-  TubeMQServiceConfig serviceConfig;
-
-  consumer_config.SetRpcReadTimeoutMs(20000);
-  result = consumer_config.SetMasterAddrInfo(err_info, master_addr);
-  if (!result) {
-    printf("\n Set Master AddrInfo failure: %s", err_info.c_str());
-    return -1;
-  }
-  result = consumer_config.SetGroupConsumeTarget(err_info, group_name, topic_list);
-  if (!result) {
-    printf("\n Set GroupConsume Target failure: %s", err_info.c_str());
-    return -1;
-  }
-  result = StartTubeMQService(err_info, serviceConfig);
-  if (!result) {
-    printf("\n StartTubeMQService failure: %s", err_info.c_str());
-    return -1;
-  }
-
-  result = consumer_1.Start(err_info, consumer_config);
-  if (!result) {
-    printf("\n Initial consumer failure, error is: %s ", err_info.c_str());
-    return -2;
-  }
-
-  ConsumerResult gentRet;
-  ConsumerResult confirm_result;
-  int64_t start_time = time(NULL);
-  do {
-    // 1. get Message;
-    result = consumer_1.GetMessage(gentRet);
-    if (result) {
-      // 2.1.1  if success, process message
-      list<Message> msgs = gentRet.GetMessageList();
-      printf("\n GetMessage success, msssage count =%ld ", msgs.size());
-      // 2.1.2 confirm message result
-      consumer_1.Confirm(gentRet.GetConfirmContext(), true, confirm_result);
-    } else {
-      // 2.2.1 if failure, check error code
-      // print error message if errcode not in
-      // [no partitions assigned, all partitions in use,
-      //    or all partitons idle, reach max position]
-      if (!(gentRet.GetErrCode() == err_code::kErrNotFound
-        || gentRet.GetErrCode() == err_code::kErrNoPartAssigned
-        || gentRet.GetErrCode() == err_code::kErrAllPartInUse
-        || gentRet.GetErrCode() == err_code::kErrAllPartWaiting)) {
-        printf("\n GetMessage failure, err_code=%d, err_msg is: %s ",
-          gentRet.GetErrCode(), gentRet.GetErrMessage().c_str());
-      }
-    }
-    // used for test, consume 10 minutes only
-    if (time(NULL) - start_time > 10 * 60) {
-      break;
-    }
-  } while (true);
-
-  getchar();  // for test hold the test thread
-  consumer_1.ShutDown();
-
-  getchar();  // for test hold the test thread
-  result = StopTubeMQService(err_info);
-  if (!result) {
-    printf("\n *** StopTubeMQService failure, reason is %s ", err_info.c_str());
-  }
-
-  printf("\n finishe test exist ");
-  return 0;
-}
-
-
-
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <signal.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <time.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <libgen.h>
+#include <sys/time.h>
+#include <chrono>
+#include <set>
+#include <string>
+#include <thread>
+#include "tubemq/tubemq_client.h"
+#include "tubemq/tubemq_config.h"
+#include "tubemq/tubemq_errcode.h"
+#include "tubemq/tubemq_message.h"
+#include "tubemq/tubemq_return.h"
+
+
+using namespace std;
+using namespace tubemq;
+
+using std::set;
+using std::string;
+using tubemq::ConsumerConfig;
+using tubemq::ConsumerResult;
+using tubemq::TubeMQConsumer;
+
+
+
+
+
+
+int main(int argc, char* argv[]) {
+  bool result;
+  string err_info;
+  if (argc < 4) {
+    printf("\n must ./comd master_addr group_name topic_name [config_file_path]");
+    return -1;
+  }
+  // set parameters
+  string master_addr = argv[1];
+  string group_name = argv[2];
+  string topic_name = argv[3];
+  string conf_file = "../conf/client.conf";
+  if (argc > 4) {
+    conf_file = argv[4];
+  }
+
+  TubeMQConsumer consumer_1;
+  set<string> topic_list;
+  topic_list.insert(topic_name);
+  ConsumerConfig consumer_config;
+  TubeMQServiceConfig serviceConfig;
+
+  consumer_config.SetRpcReadTimeoutMs(20000);
+  result = consumer_config.SetMasterAddrInfo(err_info, master_addr);
+  if (!result) {
+    printf("\n Set Master AddrInfo failure: %s", err_info.c_str());
+    return -1;
+  }
+  result = consumer_config.SetGroupConsumeTarget(err_info, group_name, topic_list);
+  if (!result) {
+    printf("\n Set GroupConsume Target failure: %s", err_info.c_str());
+    return -1;
+  }
+  result = StartTubeMQService(err_info, serviceConfig);
+  if (!result) {
+    printf("\n StartTubeMQService failure: %s", err_info.c_str());
+    return -1;
+  }
+
+  result = consumer_1.Start(err_info, consumer_config);
+  if (!result) {
+
+    printf("\n Initial consumer failure, error is: %s ", err_info.c_str());
+    return -2;
+  }
+
+  ConsumerResult gentRet;
+  ConsumerResult confirm_result;
+  int64_t start_time = time(NULL);
+  do {
+    // 1. get Message;
+    result = consumer_1.GetMessage(gentRet);
+    if (result) {
+      // 2.1.1  if success, process message
+      list<Message> msgs = gentRet.GetMessageList();
+      printf("\n GetMessage success, msssage count =%ld ", msgs.size());
+      // 2.1.2 confirm message result
+      consumer_1.Confirm(gentRet.GetConfirmContext(), true, confirm_result);
+    } else {
+      // 2.2.1 if failure, check error code
+      // print error message if errcode not in
+      // [no partitions assigned, all partitions in use,
+      //    or all partitons idle, reach max position]
+      if (!(gentRet.GetErrCode() == err_code::kErrNotFound
+        || gentRet.GetErrCode() == err_code::kErrNoPartAssigned
+        || gentRet.GetErrCode() == err_code::kErrAllPartInUse
+        || gentRet.GetErrCode() == err_code::kErrAllPartWaiting)) {
+        printf("\n GetMessage failure, err_code=%d, err_msg is: %s ",
+          gentRet.GetErrCode(), gentRet.GetErrMessage().c_str());
+      }
+    }
+    // used for test, consume 10 minutes only
+    if (time(NULL) - start_time > 10 * 60) {
+      break;
+    }
+  } while (true);
+
+  getchar();  // for test hold the test thread
+  consumer_1.ShutDown();
+
+  getchar();  // for test hold the test thread
+  result = StopTubeMQService(err_info);
+  if (!result) {
+    printf("\n *** StopTubeMQService failure, reason is %s ", err_info.c_str());
+  }
+
+  printf("\n finishe test exist ");
+  return 0;
+}
+
+
+
+
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_multi_thread_filter.cc b/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_multi_thread_filter.cc
index c20fb15..cbd4a45 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_multi_thread_filter.cc
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_multi_thread_filter.cc
@@ -1,266 +1,267 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <stdio.h>
-#include <string.h>
-#include <unistd.h>
-#include <signal.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <time.h>
-#include <fcntl.h>
-#include <errno.h>
-#include <libgen.h>
-#include <sys/time.h>
-#include <chrono>
-#include <map>
-#include <list>
-#include <set>
-#include <string>
-#include <thread>
-#include "tubemq/tubemq_atomic.h"
-#include "tubemq/tubemq_client.h"
-#include "tubemq/tubemq_config.h"
-#include "tubemq/tubemq_errcode.h"
-#include "tubemq/tubemq_message.h"
-#include "tubemq/tubemq_return.h"
-#include "tubemq/tubemq_tdmsg.h"
-
-
-
-
-using namespace std;
-using namespace tubemq;
-
-TubeMQConsumer consumer_1;
-
-
-
-AtomicLong last_print_time(0);
-AtomicLong last_msg_count(0);
-AtomicLong last_print_count(0);
-
-
-
-bool parse_TDMsg1_type_msg(const list<Message>& messageSet) {
-  string err_info;
-  list<Message>::const_iterator it_list;
-  map<string, string>::const_iterator it_attr;
-  list<DataItem>::const_iterator it_item;
-  map<string, list<DataItem> >::const_iterator it_map;
-  for (it_list = messageSet.begin(); it_list != messageSet.end(); ++it_list) {
-    printf("\nMessage id is %ld, topic is %s",
-      it_list->GetMessageId(), it_list->GetTopic().c_str());
-    TubeMQTDMsg tubemq_tdmsg;
-    if (tubemq_tdmsg.ParseTDMsg(it_list->GetData(), it_list->GetDataLength(), err_info)) {
-      printf("\n parse data success, version is %d, createTime is %ld",
-        tubemq_tdmsg.GetVersion(), tubemq_tdmsg.GetCreateTime());
-      map<string, list<DataItem> > data_map = tubemq_tdmsg.GetAttr2DataMap();
-      for (it_map = data_map.begin(); it_map != data_map.end(); ++it_map) {
-        map<string, string> key_vals;
-        if (!tubemq_tdmsg.ParseAttrValue(it_map->first, key_vals, err_info)) {
-          printf("\n parse attribute error, attr is %s, reason is %s",
-            (it_map->first).c_str(), err_info.c_str());
-          continue;
-        }
-        printf("\n parsed attribute:");
-        for (it_attr = key_vals.begin(); it_attr != key_vals.end(); ++it_attr) {
-          printf("\nkey is %s, value is %s",
-            (it_attr->first).c_str(), (it_attr->second).c_str());
-        }
-        list<DataItem> data_items = it_map->second;
-        printf("\n parsed msg count is %ld", data_items.size());
-        for (it_item = data_items.begin(); it_item != data_items.end(); ++it_item) {
-          printf("\n parsed msg data' length is %d, value is: %s",
-            it_item->GetLength(), it_item->GetData());
-        }
-      }
-    } else {
-      printf("\n \n parse data error, reason is %s\n", err_info.c_str());
-      break;
-    }
-  }
-  return true;
-}
-
-
-bool parse_raw_type_msg(const list<Message>& messageSet) {
-  int64_t last_time = last_print_time.Get();
-  int64_t cur_count = last_msg_count.AddAndGet(messageSet.size());
-  int64_t cur_time = time(NULL);
-  if (cur_count - last_print_count.Get() >= 50000
-    || cur_time - last_time > 90) {
-    if (last_print_time.CompareAndSet(last_time, cur_time)) {
-      printf("\n %ld Current message count=%ld", cur_time, last_msg_count.Get());
-      last_print_count.Set(cur_count);
-    }
-  }
-  return true;
-}
-
-void thread_task_pull(int32_t thread_no) {
-  bool result;
-  ConsumerResult gentRet;
-  ConsumerResult confirm_result;
-  printf("\n thread_task_pull start: %d", thread_no);
-  do {
-    // 1. get Message;
-    result = consumer_1.GetMessage(gentRet);
-    if (result) {
-      // 2.1.1  if success, process message
-      list<Message> msgs = gentRet.GetMessageList();
-      // 2.1.2 confirm message result
-      consumer_1.Confirm(gentRet.GetConfirmContext(), true, confirm_result);
-      parse_TDMsg1_type_msg(msgs);
-      // parse_raw_type_msg(msgs);
-    } else {
-      // 2.2.1 if failure, check error code
-      // print error message if errcode not in
-      // [no partitions assigned, all partitions in use,
-      //    or all partitons idle, reach max position]
-      if (!(gentRet.GetErrCode() == err_code::kErrNotFound
-        || gentRet.GetErrCode() == err_code::kErrNoPartAssigned
-        || gentRet.GetErrCode() == err_code::kErrAllPartInUse
-        || gentRet.GetErrCode() == err_code::kErrAllPartWaiting)) {
-        if (gentRet.GetErrCode() == err_code::kErrMQServiceStop
-          || gentRet.GetErrCode() == err_code::kErrClientStop) {
-          break;
-        }
-        printf("\n GetMessage failure, err_code=%d, err_msg is: %s ",
-          gentRet.GetErrCode(), gentRet.GetErrMessage().c_str());
-      }
-    }
-  } while (true);
-  printf("\n thread_task_pull finished: %d", thread_no);
-}
-
-
-int main(int argc, char* argv[]) {
-  bool result;
-  string err_info;
-  if (argc < 4) {
-    printf("\n must ./comd master_addr group_name topic_name [config_file_path]");
-    return -1;
-  }
-  // set parameters
-  string master_addr = argv[1];
-  string group_name = argv[2];
-  string topic_name = argv[3];
-  string conf_file = "../conf/client.conf";
-  if (argc > 4) {
-    conf_file = argv[4];
-  }
-  int32_t thread_num = 15;
-  ConsumerConfig consumer_config;
-  TubeMQServiceConfig serviceConfig;
-  consumer_config.SetRpcReadTimeoutMs(20000);
-  result = consumer_config.SetMasterAddrInfo(err_info, master_addr);
-  if (!result) {
-    printf("\n Set Master AddrInfo failure: %s ", err_info.c_str());
-    return -1;
-  }
-
-  // non-filter consume begin
-  set<string> topic_list;
-  topic_list.insert(topic_name);
-  result = consumer_config.SetGroupConsumeTarget(err_info, group_name, topic_list);
-  if (!result) {
-    printf("\n Set GroupConsume Target failure: %s", err_info.c_str());
-    return -1;
-  }
-  //  non-filter consume end
-
-
-/*
-  // filter consume begin
-  set<string> filters;
-  filters.insert("aaa");
-  filters.insert("bbb");
-  filters.insert("xxb");
-  map<string, set<string> > subscribed_topic_and_filter_map;
-  subscribed_topic_and_filter_map["test_1"] = filters; 
-  result = consumer_config.SetGroupConsumeTarget(err_info,
-    group_name, subscribed_topic_and_filter_map);
-  if (!result) {
-    printf("\n Set GroupConsume Target failure: %s", err_info.c_str());
-    return -1;
-  }
-  // filter consume end
-
-  // bound consume begin
-  set<string> filters;
-  filters.insert("aaa");
-  filters.insert("bbb");
-  filters.insert("xxb");
-  map<string, set<string> > subscribed_topic_and_filter_map;
-  subscribed_topic_and_filter_map[topic_name] = filters;
-  string session_key = "test";
-  uint32_t source_count = 2;
-  bool is_select_big = true;
-  map<string, int64_t> part_offset_map;
-  part_offset_map["181895251:test_1:0"] = 0;
-  part_offset_map["181895251:test_1:10"] = 0;
-  part_offset_map["181895251:test_1:20"] = 0;
-  part_offset_map["181895251:test_1:30"] = 0;
-  result =  consumer_config.SetGroupConsumeTarget(err_info, group_name,
-    subscribed_topic_and_filter_map, session_key,
-    source_count, is_select_big, part_offset_map);
-  if (!result) {
-    printf("\n Set GroupConsume Target failure: %s", err_info.c_str());
-    return -1;
-  }
-// bound consume end
-*/
-
-  result = StartTubeMQService(err_info, serviceConfig);
-  if (!result) {
-    printf("\n StartTubeMQService failure: %s", err_info.c_str());
-    return -1;
-  }
-
-  result = consumer_1.Start(err_info, consumer_config);
-  if (!result) {
-    consumer_1.ShutDown();
-    printf("\n Initial consumer failure, error is: %s ", err_info.c_str());
-    return -2;
-  }
-  std::thread pull_threads[thread_num];
-  for (int32_t i = 0; i < thread_num; i++) {
-    pull_threads[i] = std::thread(thread_task_pull, i);
-  }
-
-  getchar();  // for test hold the test thread
-  consumer_1.ShutDown();
-  //
-  for (int32_t i = 0; i < thread_num; i++) {
-    if (pull_threads[i].joinable()) {
-      pull_threads[i].join();
-    }
-  }
-
-  getchar();  // for test hold the test thread
-  result = StopTubeMQService(err_info);
-  if (!result) {
-    printf("\n *** StopTubeMQService failure, reason is %s", err_info.c_str());
-  }
-
-  printf("\n finishe test exist");
-  return 0;
-}
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <signal.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <time.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <libgen.h>
+#include <sys/time.h>
+#include <chrono>
+#include <map>
+#include <list>
+#include <set>
+#include <string>
+#include <thread>
+#include "tubemq/tubemq_atomic.h"
+#include "tubemq/tubemq_client.h"
+#include "tubemq/tubemq_config.h"
+#include "tubemq/tubemq_errcode.h"
+#include "tubemq/tubemq_message.h"
+#include "tubemq/tubemq_return.h"
+#include "tubemq/tubemq_tdmsg.h"
+
+
+
+
+using namespace std;
+using namespace tubemq;
+
+TubeMQConsumer consumer_1;
+
+
+
+AtomicLong last_print_time(0);
+AtomicLong last_msg_count(0);
+AtomicLong last_print_count(0);
+
+
+
+bool parse_TDMsg1_type_msg(const list<Message>& messageSet) {
+  string err_info;
+  list<Message>::const_iterator it_list;
+  map<string, string>::const_iterator it_attr;
+  list<DataItem>::const_iterator it_item;
+  map<string, list<DataItem> >::const_iterator it_map;
+  for (it_list = messageSet.begin(); it_list != messageSet.end(); ++it_list) {
+    printf("\nMessage id is %ld, topic is %s",
+      it_list->GetMessageId(), it_list->GetTopic().c_str());
+    TubeMQTDMsg tubemq_tdmsg;
+    if (tubemq_tdmsg.ParseTDMsg(it_list->GetData(), it_list->GetDataLength(), err_info)) {
+      printf("\n parse data success, version is %d, createTime is %ld",
+        tubemq_tdmsg.GetVersion(), tubemq_tdmsg.GetCreateTime());
+      map<string, list<DataItem> > data_map = tubemq_tdmsg.GetAttr2DataMap();
+      for (it_map = data_map.begin(); it_map != data_map.end(); ++it_map) {
+        map<string, string> key_vals;
+        if (!tubemq_tdmsg.ParseAttrValue(it_map->first, key_vals, err_info)) {
+          printf("\n parse attribute error, attr is %s, reason is %s",
+            (it_map->first).c_str(), err_info.c_str());
+          continue;
+        }
+        printf("\n parsed attribute:");
+        for (it_attr = key_vals.begin(); it_attr != key_vals.end(); ++it_attr) {
+          printf("\nkey is %s, value is %s",
+            (it_attr->first).c_str(), (it_attr->second).c_str());
+        }
+        list<DataItem> data_items = it_map->second;
+        printf("\n parsed msg count is %ld", data_items.size());
+        for (it_item = data_items.begin(); it_item != data_items.end(); ++it_item) {
+          printf("\n parsed msg data' length is %d, value is: %s",
+            it_item->GetLength(), it_item->GetData());
+        }
+      }
+    } else {
+      printf("\n \n parse data error, reason is %s\n", err_info.c_str());
+      break;
+    }
+  }
+  return true;
+}
+
+
+bool parse_raw_type_msg(const list<Message>& messageSet) {
+  int64_t last_time = last_print_time.Get();
+  int64_t cur_count = last_msg_count.AddAndGet(messageSet.size());
+  int64_t cur_time = time(NULL);
+  if (cur_count - last_print_count.Get() >= 50000
+    || cur_time - last_time > 90) {
+    if (last_print_time.CompareAndSet(last_time, cur_time)) {
+      printf("\n %ld Current message count=%ld", cur_time, last_msg_count.Get());
+      last_print_count.Set(cur_count);
+    }
+  }
+  return true;
+}
+
+void thread_task_pull(int32_t thread_no) {
+  bool result;
+  ConsumerResult gentRet;
+  ConsumerResult confirm_result;
+  printf("\n thread_task_pull start: %d", thread_no);
+  do {
+    // 1. get Message;
+    result = consumer_1.GetMessage(gentRet);
+    if (result) {
+      // 2.1.1  if success, process message
+      list<Message> msgs = gentRet.GetMessageList();
+      // 2.1.2 confirm message result
+      consumer_1.Confirm(gentRet.GetConfirmContext(), true, confirm_result);
+      parse_TDMsg1_type_msg(msgs);
+      // parse_raw_type_msg(msgs);
+    } else {
+      // 2.2.1 if failure, check error code
+      // print error message if errcode not in
+      // [no partitions assigned, all partitions in use,
+      //    or all partitons idle, reach max position]
+      if (!(gentRet.GetErrCode() == err_code::kErrNotFound
+        || gentRet.GetErrCode() == err_code::kErrNoPartAssigned
+        || gentRet.GetErrCode() == err_code::kErrAllPartInUse
+        || gentRet.GetErrCode() == err_code::kErrAllPartWaiting)) {
+        if (gentRet.GetErrCode() == err_code::kErrMQServiceStop
+          || gentRet.GetErrCode() == err_code::kErrClientStop) {
+          break;
+        }
+        printf("\n GetMessage failure, err_code=%d, err_msg is: %s ",
+          gentRet.GetErrCode(), gentRet.GetErrMessage().c_str());
+      }
+    }
+  } while (true);
+  printf("\n thread_task_pull finished: %d", thread_no);
+}
+
+
+int main(int argc, char* argv[]) {
+  bool result;
+  string err_info;
+  if (argc < 4) {
+    printf("\n must ./comd master_addr group_name topic_name [config_file_path]");
+    return -1;
+  }
+  // set parameters
+  string master_addr = argv[1];
+  string group_name = argv[2];
+  string topic_name = argv[3];
+  string conf_file = "../conf/client.conf";
+  if (argc > 4) {
+    conf_file = argv[4];
+  }
+  int32_t thread_num = 15;
+  ConsumerConfig consumer_config;
+  TubeMQServiceConfig serviceConfig;
+  consumer_config.SetRpcReadTimeoutMs(20000);
+  result = consumer_config.SetMasterAddrInfo(err_info, master_addr);
+  if (!result) {
+    printf("\n Set Master AddrInfo failure: %s ", err_info.c_str());
+    return -1;
+  }
+
+  // non-filter consume begin
+  set<string> topic_list;
+  topic_list.insert(topic_name);
+  result = consumer_config.SetGroupConsumeTarget(err_info, group_name, topic_list);
+  if (!result) {
+    printf("\n Set GroupConsume Target failure: %s", err_info.c_str());
+    return -1;
+  }
+  //  non-filter consume end
+
+
+/*
+  // filter consume begin
+  set<string> filters;
+  filters.insert("aaa");
+  filters.insert("bbb");
+  filters.insert("xxb");
+  map<string, set<string> > subscribed_topic_and_filter_map;
+  subscribed_topic_and_filter_map["test_1"] = filters; 
+  result = consumer_config.SetGroupConsumeTarget(err_info,
+    group_name, subscribed_topic_and_filter_map);
+  if (!result) {
+    printf("\n Set GroupConsume Target failure: %s", err_info.c_str());
+    return -1;
+  }
+  // filter consume end
+
+  // bound consume begin
+  set<string> filters;
+  filters.insert("aaa");
+  filters.insert("bbb");
+  filters.insert("xxb");
+  map<string, set<string> > subscribed_topic_and_filter_map;
+  subscribed_topic_and_filter_map[topic_name] = filters;
+  string session_key = "test";
+  uint32_t source_count = 2;
+  bool is_select_big = true;
+  map<string, int64_t> part_offset_map;
+  part_offset_map["181895251:test_1:0"] = 0;
+  part_offset_map["181895251:test_1:10"] = 0;
+  part_offset_map["181895251:test_1:20"] = 0;
+  part_offset_map["181895251:test_1:30"] = 0;
+  result =  consumer_config.SetGroupConsumeTarget(err_info, group_name,
+    subscribed_topic_and_filter_map, session_key,
+    source_count, is_select_big, part_offset_map);
+  if (!result) {
+    printf("\n Set GroupConsume Target failure: %s", err_info.c_str());
+    return -1;
+  }
+// bound consume end
+*/
+
+  result = StartTubeMQService(err_info, serviceConfig);
+  if (!result) {
+    printf("\n StartTubeMQService failure: %s", err_info.c_str());
+    return -1;
+  }
+
+  result = consumer_1.Start(err_info, consumer_config);
+  if (!result) {
+
+    consumer_1.ShutDown();
+    printf("\n Initial consumer failure, error is: %s ", err_info.c_str());
+    return -2;
+  }
+  std::thread pull_threads[thread_num];
+  for (int32_t i = 0; i < thread_num; i++) {
+    pull_threads[i] = std::thread(thread_task_pull, i);
+  }
+
+  getchar();  // for test hold the test thread
+  consumer_1.ShutDown();
+  //
+  for (int32_t i = 0; i < thread_num; i++) {
+    if (pull_threads[i].joinable()) {
+      pull_threads[i].join();
+    }
+  }
+
+  getchar();  // for test hold the test thread
+  result = StopTubeMQService(err_info);
+  if (!result) {
+    printf("\n *** StopTubeMQService failure, reason is %s", err_info.c_str());
+  }
+
+  printf("\n finishe test exist");
+  return 0;
+}
+
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_multithread_pull.cc b/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_multithread_pull.cc
index 1e33cea..ef09691 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_multithread_pull.cc
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_multithread_pull.cc
@@ -1,177 +1,178 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <errno.h>
-#include <fcntl.h>
-#include <libgen.h>
-#include <signal.h>
-#include <stdio.h>
-#include <string.h>
-#include <sys/time.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <unistd.h>
-#include <time.h>
-#include <string>
-#include <chrono>
-#include <set>
-#include <thread>
-#include "tubemq/tubemq_atomic.h"
-#include "tubemq/tubemq_client.h"
-#include "tubemq/tubemq_config.h"
-#include "tubemq/tubemq_errcode.h"
-#include "tubemq/tubemq_message.h"
-#include "tubemq/tubemq_return.h"
-
-
-using namespace std;
-using namespace tubemq;
-
-TubeMQConsumer consumer_1;
-
-
-
-AtomicLong last_print_time(0);
-AtomicLong last_msg_count(0);
-AtomicLong last_print_count(0);
-
-
-
-void calc_message_count(int64_t msg_count) {
-  int64_t last_time = last_print_time.Get();
-  int64_t cur_count = last_msg_count.AddAndGet(msg_count);
-  int64_t cur_time = time(NULL);
-  if (cur_count - last_print_count.Get() >= 50000
-    || cur_time - last_time > 90) {
-    if (last_print_time.CompareAndSet(last_time, cur_time)) {
-      printf("\n %ld Current message count=%ld", cur_time, last_msg_count.Get());
-      last_print_count.Set(cur_count);
-    }
-  }
-}
-
-void thread_task_pull(int32_t thread_no) {
-  bool result;
-  int64_t msg_count = 0;
-  ConsumerResult gentRet;
-  ConsumerResult confirm_result;
-  printf("\n thread_task_pull start: %d", thread_no);
-  do {
-    msg_count = 0;
-    // 1. get Message;
-    result = consumer_1.GetMessage(gentRet);
-    if (result) {
-      // 2.1.1  if success, process message
-      list<Message> msgs = gentRet.GetMessageList();
-      msg_count = msgs.size();
-      // 2.1.2 confirm message result
-      consumer_1.Confirm(gentRet.GetConfirmContext(), true, confirm_result);
-    } else {
-      // 2.2.1 if failure, check error code
-      // print error message if errcode not in
-      // [no partitions assigned, all partitions in use,
-      //    or all partitons idle, reach max position]
-      if (!(gentRet.GetErrCode() == err_code::kErrNotFound
-        || gentRet.GetErrCode() == err_code::kErrNoPartAssigned
-        || gentRet.GetErrCode() == err_code::kErrAllPartInUse
-        || gentRet.GetErrCode() == err_code::kErrAllPartWaiting)) {
-        if (gentRet.GetErrCode() == err_code::kErrMQServiceStop
-          || gentRet.GetErrCode() == err_code::kErrClientStop) {
-          break;
-        }
-        printf("\n GetMessage failure, err_code=%d, err_msg is: %s ",
-          gentRet.GetErrCode(), gentRet.GetErrMessage().c_str());
-      }
-    }
-    calc_message_count(msg_count);
-  } while (true);
-  printf("\n thread_task_pull finished: %d", thread_no);
-}
-
-
-int main(int argc, char* argv[]) {
-  bool result;
-  string err_info;
-
-  if (argc < 4) {
-    printf("\n must ./comd master_addr group_name topic_name [config_file_path]");
-    return -1;
-  }
-  // set parameters
-  string master_addr = argv[1];
-  string group_name = argv[2];
-  string topic_name = argv[3];
-  string conf_file = "../conf/client.conf";
-  if (argc > 4) {
-    conf_file = argv[4];
-  }
-
-  int32_t thread_num = 15;
-  set<string> topic_list;
-  topic_list.insert(topic_name);
-  ConsumerConfig consumer_config;
-  TubeMQServiceConfig serviceConfig;
-
-  consumer_config.SetRpcReadTimeoutMs(20000);
-  result = consumer_config.SetMasterAddrInfo(err_info, master_addr);
-  if (!result) {
-    printf("\n Set Master AddrInfo failure: %s ", err_info.c_str());
-    return -1;
-  }
-  result = consumer_config.SetGroupConsumeTarget(err_info, group_name, topic_list);
-  if (!result) {
-    printf("\n Set GroupConsume Target failure: %s", err_info.c_str());
-    return -1;
-  }
-  result = StartTubeMQService(err_info, serviceConfig);
-  if (!result) {
-    printf("\n StartTubeMQService failure: %s", err_info.c_str());
-    return -1;
-  }
-
-  result = consumer_1.Start(err_info, consumer_config);
-  if (!result) {
-    printf("\n Initial consumer failure, error is: %s ", err_info.c_str());
-    return -2;
-  }
-
-  std::thread pull_threads[thread_num];
-  for (int32_t i = 0; i < thread_num; i++) {
-    pull_threads[i] = std::thread(thread_task_pull, i);
-  }
-
-  getchar();  // for test hold the test thread
-  consumer_1.ShutDown();
-  //
-  for (int32_t i = 0; i < thread_num; i++) {
-    if (pull_threads[i].joinable()) {
-      pull_threads[i].join();
-    }
-  }
-
-  getchar();  // for test hold the test thread
-  result = StopTubeMQService(err_info);
-  if (!result) {
-    printf("\n *** StopTubeMQService failure, reason is %s", err_info.c_str());
-  }
-
-  printf("\n finishe test exist");
-  return 0;
-}
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <errno.h>
+#include <fcntl.h>
+#include <libgen.h>
+#include <signal.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <time.h>
+#include <string>
+#include <chrono>
+#include <set>
+#include <thread>
+#include "tubemq/tubemq_atomic.h"
+#include "tubemq/tubemq_client.h"
+#include "tubemq/tubemq_config.h"
+#include "tubemq/tubemq_errcode.h"
+#include "tubemq/tubemq_message.h"
+#include "tubemq/tubemq_return.h"
+
+
+using namespace std;
+using namespace tubemq;
+
+TubeMQConsumer consumer_1;
+
+
+
+AtomicLong last_print_time(0);
+AtomicLong last_msg_count(0);
+AtomicLong last_print_count(0);
+
+
+
+void calc_message_count(int64_t msg_count) {
+  int64_t last_time = last_print_time.Get();
+  int64_t cur_count = last_msg_count.AddAndGet(msg_count);
+  int64_t cur_time = time(NULL);
+  if (cur_count - last_print_count.Get() >= 50000
+    || cur_time - last_time > 90) {
+    if (last_print_time.CompareAndSet(last_time, cur_time)) {
+      printf("\n %ld Current message count=%ld", cur_time, last_msg_count.Get());
+      last_print_count.Set(cur_count);
+    }
+  }
+}
+
+void thread_task_pull(int32_t thread_no) {
+  bool result;
+  int64_t msg_count = 0;
+  ConsumerResult gentRet;
+  ConsumerResult confirm_result;
+  printf("\n thread_task_pull start: %d", thread_no);
+  do {
+    msg_count = 0;
+    // 1. get Message;
+    result = consumer_1.GetMessage(gentRet);
+    if (result) {
+      // 2.1.1  if success, process message
+      list<Message> msgs = gentRet.GetMessageList();
+      msg_count = msgs.size();
+      // 2.1.2 confirm message result
+      consumer_1.Confirm(gentRet.GetConfirmContext(), true, confirm_result);
+    } else {
+      // 2.2.1 if failure, check error code
+      // print error message if errcode not in
+      // [no partitions assigned, all partitions in use,
+      //    or all partitons idle, reach max position]
+      if (!(gentRet.GetErrCode() == err_code::kErrNotFound
+        || gentRet.GetErrCode() == err_code::kErrNoPartAssigned
+        || gentRet.GetErrCode() == err_code::kErrAllPartInUse
+        || gentRet.GetErrCode() == err_code::kErrAllPartWaiting)) {
+        if (gentRet.GetErrCode() == err_code::kErrMQServiceStop
+          || gentRet.GetErrCode() == err_code::kErrClientStop) {
+          break;
+        }
+        printf("\n GetMessage failure, err_code=%d, err_msg is: %s ",
+          gentRet.GetErrCode(), gentRet.GetErrMessage().c_str());
+      }
+    }
+    calc_message_count(msg_count);
+  } while (true);
+  printf("\n thread_task_pull finished: %d", thread_no);
+}
+
+
+int main(int argc, char* argv[]) {
+  bool result;
+  string err_info;
+
+  if (argc < 4) {
+    printf("\n must ./comd master_addr group_name topic_name [config_file_path]");
+    return -1;
+  }
+  // set parameters
+  string master_addr = argv[1];
+  string group_name = argv[2];
+  string topic_name = argv[3];
+  string conf_file = "../conf/client.conf";
+  if (argc > 4) {
+    conf_file = argv[4];
+  }
+
+  int32_t thread_num = 15;
+  set<string> topic_list;
+  topic_list.insert(topic_name);
+  ConsumerConfig consumer_config;
+  TubeMQServiceConfig serviceConfig;
+
+  consumer_config.SetRpcReadTimeoutMs(20000);
+  result = consumer_config.SetMasterAddrInfo(err_info, master_addr);
+  if (!result) {
+    printf("\n Set Master AddrInfo failure: %s ", err_info.c_str());
+    return -1;
+  }
+  result = consumer_config.SetGroupConsumeTarget(err_info, group_name, topic_list);
+  if (!result) {
+    printf("\n Set GroupConsume Target failure: %s", err_info.c_str());
+    return -1;
+  }
+  result = StartTubeMQService(err_info, serviceConfig);
+  if (!result) {
+    printf("\n StartTubeMQService failure: %s", err_info.c_str());
+    return -1;
+  }
+
+  result = consumer_1.Start(err_info, consumer_config);
+  if (!result) {
+
+    printf("\n Initial consumer failure, error is: %s ", err_info.c_str());
+    return -2;
+  }
+
+  std::thread pull_threads[thread_num];
+  for (int32_t i = 0; i < thread_num; i++) {
+    pull_threads[i] = std::thread(thread_task_pull, i);
+  }
+
+  getchar();  // for test hold the test thread
+  consumer_1.ShutDown();
+  //
+  for (int32_t i = 0; i < thread_num; i++) {
+    if (pull_threads[i].joinable()) {
+      pull_threads[i].join();
+    }
+  }
+
+  getchar();  // for test hold the test thread
+  result = StopTubeMQService(err_info);
+  if (!result) {
+    printf("\n *** StopTubeMQService failure, reason is %s", err_info.c_str());
+  }
+
+  printf("\n finishe test exist");
+  return 0;
+}
+
diff --git a/licenses/inlong-dataproxy/LICENSE-binary b/licenses/inlong-dataproxy/LICENSE-binary
index ffa1b91..4cac0c0 100644
--- a/licenses/inlong-dataproxy/LICENSE-binary
+++ b/licenses/inlong-dataproxy/LICENSE-binary
@@ -1,4 +1,4 @@
-
+
                                  Apache License
                            Version 2.0, January 2004
                         http://www.apache.org/licenses/
diff --git a/licenses/inlong-dataproxy/licenses-binary/LICENSE-google-protobuf-java-format.txt b/licenses/inlong-dataproxy/licenses-binary/LICENSE-google-protobuf-java-format.txt
index 083dfff..f9ea500 100644
--- a/licenses/inlong-dataproxy/licenses-binary/LICENSE-google-protobuf-java-format.txt
+++ b/licenses/inlong-dataproxy/licenses-binary/LICENSE-google-protobuf-java-format.txt
@@ -1,27 +1,27 @@
- 
-	Copyright (c) 2009, Orbitz World Wide
-	All rights reserved.
-
-	Redistribution and use in source and binary forms, with or without modification, 
-	are permitted provided that the following conditions are met:
-
-		* Redistributions of source code must retain the above copyright notice, 
-		  this list of conditions and the following disclaimer.
-		* Redistributions in binary form must reproduce the above copyright notice, 
-		  this list of conditions and the following disclaimer in the documentation 
-		  and/or other materials provided with the distribution.
-		* Neither the name of the Orbitz World Wide nor the names of its contributors 
-		  may be used to endorse or promote products derived from this software 
-		  without specific prior written permission.
-
-	THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-	"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-	LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-	A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-	OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-	SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-	LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-	DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-	THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-	(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-	OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ 
+	Copyright (c) 2009, Orbitz World Wide
+	All rights reserved.
+
+	Redistribution and use in source and binary forms, with or without modification, 
+	are permitted provided that the following conditions are met:
+
+		* Redistributions of source code must retain the above copyright notice, 
+		  this list of conditions and the following disclaimer.
+		* Redistributions in binary form must reproduce the above copyright notice, 
+		  this list of conditions and the following disclaimer in the documentation 
+		  and/or other materials provided with the distribution.
+		* Neither the name of the Orbitz World Wide nor the names of its contributors 
+		  may be used to endorse or promote products derived from this software 
+		  without specific prior written permission.
+
+	THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+	"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+	LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+	A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+	OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+	SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+	LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+	DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+	THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+	(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+	OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/licenses/inlong-tubemq/LICENSE-binary b/licenses/inlong-tubemq/LICENSE-binary
index 46a0b3a..af2b7c1 100644
--- a/licenses/inlong-tubemq/LICENSE-binary
+++ b/licenses/inlong-tubemq/LICENSE-binary
@@ -1,4 +1,4 @@
-
+
                                  Apache License
                            Version 2.0, January 2004
                         http://www.apache.org/licenses/
diff --git a/licenses/inlong-tubemq/licenses-binary/LICENSE-google-protobuf-java-format.txt b/licenses/inlong-tubemq/licenses-binary/LICENSE-google-protobuf-java-format.txt
index 083dfff..f9ea500 100644
--- a/licenses/inlong-tubemq/licenses-binary/LICENSE-google-protobuf-java-format.txt
+++ b/licenses/inlong-tubemq/licenses-binary/LICENSE-google-protobuf-java-format.txt
@@ -1,27 +1,27 @@
- 
-	Copyright (c) 2009, Orbitz World Wide
-	All rights reserved.
-
-	Redistribution and use in source and binary forms, with or without modification, 
-	are permitted provided that the following conditions are met:
-
-		* Redistributions of source code must retain the above copyright notice, 
-		  this list of conditions and the following disclaimer.
-		* Redistributions in binary form must reproduce the above copyright notice, 
-		  this list of conditions and the following disclaimer in the documentation 
-		  and/or other materials provided with the distribution.
-		* Neither the name of the Orbitz World Wide nor the names of its contributors 
-		  may be used to endorse or promote products derived from this software 
-		  without specific prior written permission.
-
-	THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-	"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-	LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-	A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-	OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-	SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-	LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-	DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-	THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-	(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-	OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ 
+	Copyright (c) 2009, Orbitz World Wide
+	All rights reserved.
+
+	Redistribution and use in source and binary forms, with or without modification, 
+	are permitted provided that the following conditions are met:
+
+		* Redistributions of source code must retain the above copyright notice, 
+		  this list of conditions and the following disclaimer.
+		* Redistributions in binary form must reproduce the above copyright notice, 
+		  this list of conditions and the following disclaimer in the documentation 
+		  and/or other materials provided with the distribution.
+		* Neither the name of the Orbitz World Wide nor the names of its contributors 
+		  may be used to endorse or promote products derived from this software 
+		  without specific prior written permission.
+
+	THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+	"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+	LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+	A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+	OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+	SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+	LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+	DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+	THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+	(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+	OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.