You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/12/21 02:03:15 UTC

[incubator-inlong] branch master updated: [INLONG-2038][Bug]inlong-sort abandon data from pulsar due to an ClassCastException (#2041)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 7dd56ed  [INLONG-2038][Bug]inlong-sort abandon data from pulsar due to an ClassCastException (#2041)
7dd56ed is described below

commit 7dd56ed0e030e8dc280a741254fa3cba93b94051
Author: TianqiWan <52...@users.noreply.github.com>
AuthorDate: Tue Dec 21 10:02:42 2021 +0800

    [INLONG-2038][Bug]inlong-sort abandon data from pulsar due to an ClassCastException (#2041)
    
    Co-authored-by: tianqiwan <ti...@tencent.com>
---
 ...Record.java => TDMsgMixedSerializedRecord.java} |   6 +-
 .../deserialization/DeserializationSchema.java     |  68 ++++++--------
 .../deserialization/MultiTenancyDeserializer.java  | 104 +++++++++++++++++++++
 .../MultiTenancyTDMsgMixedDeserializer.java        |  20 ++--
 .../flink/deserialization/TDMsgDeserializer.java   |  40 ++++++++
 .../deserialization/TDMsgMixedDeserializer.java    |   6 +-
 .../flink/tubemq/MultiTenancyTubeConsumer.java     |   4 +-
 .../MultiTenancyTDMsgMixedDeserializerTest.java    |   4 +-
 .../TDMsgMixedDeserializerTest.java                |   4 +-
 9 files changed, 200 insertions(+), 56 deletions(-)

diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgSerializedRecord.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgMixedSerializedRecord.java
similarity index 87%
rename from inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgSerializedRecord.java
rename to inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgMixedSerializedRecord.java
index 37feba0..52164dc 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgSerializedRecord.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgMixedSerializedRecord.java
@@ -22,7 +22,7 @@ import static org.apache.inlong.sort.configuration.Constants.UNKNOWN_DATAFLOW_ID
 /**
  * Data flow id might not been got from mixed TDMsg data stream.
  */
-public class TDMsgSerializedRecord extends SerializedRecord {
+public class TDMsgMixedSerializedRecord extends SerializedRecord {
 
     private static final long serialVersionUID = 4075321919886376829L;
 
@@ -31,11 +31,11 @@ public class TDMsgSerializedRecord extends SerializedRecord {
     /**
      * Just satisfy requirement of Flink Pojo definition.
      */
-    public TDMsgSerializedRecord() {
+    public TDMsgMixedSerializedRecord() {
         super();
     }
 
-    public TDMsgSerializedRecord(String topic, long timestampMillis, byte[] data) {
+    public TDMsgMixedSerializedRecord(String topic, long timestampMillis, byte[] data) {
         super(UNKNOWN_DATAFLOW_ID, timestampMillis, data);
         this.topic = topic;
     }
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/DeserializationSchema.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/DeserializationSchema.java
index ae194bc..5827ef2 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/DeserializationSchema.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/DeserializationSchema.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.sort.flink.deserialization;
 
-import static org.apache.inlong.sort.configuration.Constants.UNKNOWN_DATAFLOW_ID;
-
 import com.google.common.base.Preconditions;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.util.Collector;
@@ -26,7 +24,7 @@ import org.apache.inlong.sort.configuration.Configuration;
 import org.apache.inlong.sort.configuration.Constants;
 import org.apache.inlong.sort.flink.Record;
 import org.apache.inlong.sort.flink.SerializedRecord;
-import org.apache.inlong.sort.flink.TDMsgSerializedRecord;
+import org.apache.inlong.sort.flink.TDMsgMixedSerializedRecord;
 import org.apache.inlong.sort.flink.metrics.MetricData;
 import org.apache.inlong.sort.flink.metrics.MetricData.MetricSource;
 import org.apache.inlong.sort.flink.metrics.MetricData.MetricType;
@@ -58,6 +56,8 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
 
     private transient MultiTenancyTDMsgMixedDeserializer multiTenancyTdMsgMixedDeserializer;
 
+    private transient MultiTenancyDeserializer multiTenancyDeserializer;
+
     private transient MetaManager metaManager;
 
     private transient Boolean enableOutputMetrics;
@@ -70,6 +70,7 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
     public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
         schemaLock = new Object();
         multiTenancyTdMsgMixedDeserializer = new MultiTenancyTDMsgMixedDeserializer();
+        multiTenancyDeserializer = new MultiTenancyDeserializer();
         fieldMappingTransformer = new FieldMappingTransformer();
         recordTransformer = new RecordTransformer(config.getInteger(Constants.ETL_RECORD_SERIALIZATION_BUFFER_SIZE));
         metaManager = MetaManager.getInstance(config);
@@ -105,46 +106,34 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
                 context.output(METRIC_DATA_OUTPUT_TAG, metricData);
             }
 
-            if (serializedRecord instanceof TDMsgSerializedRecord
-                    && serializedRecord.getDataFlowId() == UNKNOWN_DATAFLOW_ID) {
-                final TDMsgSerializedRecord tdmsgRecord = (TDMsgSerializedRecord) serializedRecord;
-                synchronized (schemaLock) {
-                    multiTenancyTdMsgMixedDeserializer.deserialize(
-                            tdmsgRecord,
-                            new CallbackCollector<>(sourceRecord -> {
-                                final Record sinkRecord = fieldMappingTransformer.transform(sourceRecord);
-
-                                if (enableOutputMetrics) {
-                                    MetricData metricData = new MetricData(
-                                            // TODO, outputs this metric in Sink Function
-                                            MetricSource.SINK,
-                                            MetricType.SUCCESSFUL,
-                                            sinkRecord.getTimestampMillis(),
-                                            sinkRecord.getDataflowId(),
-                                            "",
-                                            1);
-
-                                    context.output(METRIC_DATA_OUTPUT_TAG, metricData);
-                                }
-
-                                collector.collect(recordTransformer.toSerializedRecord(sinkRecord));
-                            }));
-                }
-            } else {
-                // TODO, support more data types
-                if (enableOutputMetrics
-                        && !config.getString(Constants.SOURCE_TYPE).equals(Constants.SOURCE_TYPE_TUBE)) {
+            final CallbackCollector<Record> transformCollector = new CallbackCollector<>(sourceRecord -> {
+                final Record sinkRecord = fieldMappingTransformer.transform(sourceRecord);
+
+                if (enableOutputMetrics) {
                     MetricData metricData = new MetricData(
-                            MetricSource.DESERIALIZATION,
-                            MetricType.ABANDONED,
-                            serializedRecord.getTimestampMillis(),
-                            serializedRecord.getDataFlowId(),
-                            "Unsupported schema",
+                            // TODO, outputs this metric in Sink Function
+                            MetricSource.SINK,
+                            MetricType.SUCCESSFUL,
+                            sinkRecord.getTimestampMillis(),
+                            sinkRecord.getDataflowId(),
+                            "",
                             1);
+
                     context.output(METRIC_DATA_OUTPUT_TAG, metricData);
                 }
 
-                LOG.warn("Abandon data due to unsupported record {}", serializedRecord);
+                collector.collect(recordTransformer.toSerializedRecord(sinkRecord));
+            });
+
+            if (serializedRecord instanceof TDMsgMixedSerializedRecord) {
+                final TDMsgMixedSerializedRecord tdmsgRecord = (TDMsgMixedSerializedRecord) serializedRecord;
+                synchronized (schemaLock) {
+                    multiTenancyTdMsgMixedDeserializer.deserialize(tdmsgRecord, transformCollector);
+                }
+            } else {
+                synchronized (schemaLock) {
+                    multiTenancyDeserializer.deserialize(serializedRecord, transformCollector);
+                }
             }
         } catch (Exception e) {
             if (enableOutputMetrics
@@ -169,6 +158,7 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
         public void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
             synchronized (schemaLock) {
                 multiTenancyTdMsgMixedDeserializer.addDataFlow(dataFlowInfo);
+                multiTenancyDeserializer.addDataFlow(dataFlowInfo);
                 fieldMappingTransformer.addDataFlow(dataFlowInfo);
                 recordTransformer.addDataFlow(dataFlowInfo);
             }
@@ -178,6 +168,7 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
         public void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
             synchronized (schemaLock) {
                 multiTenancyTdMsgMixedDeserializer.updateDataFlow(dataFlowInfo);
+                multiTenancyDeserializer.updateDataFlow(dataFlowInfo);
                 fieldMappingTransformer.updateDataFlow(dataFlowInfo);
                 recordTransformer.updateDataFlow(dataFlowInfo);
             }
@@ -187,6 +178,7 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
         public void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
             synchronized (schemaLock) {
                 multiTenancyTdMsgMixedDeserializer.removeDataFlow(dataFlowInfo);
+                multiTenancyDeserializer.removeDataFlow(dataFlowInfo);
                 fieldMappingTransformer.removeDataFlow(dataFlowInfo);
                 recordTransformer.removeDataFlow(dataFlowInfo);
             }
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyDeserializer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyDeserializer.java
new file mode 100644
index 0000000..3e75aa5
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyDeserializer.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.deserialization;
+
+import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.DEFAULT_TIME_FIELD_NAME;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.util.Collector;
+import org.apache.inlong.sort.flink.Record;
+import org.apache.inlong.sort.flink.SerializedRecord;
+import org.apache.inlong.sort.formats.base.TableFormatConstants;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.tdmsgcsv.TDMsgCsvFormatDeserializer;
+import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.TDMsgCsvDeserializationInfo;
+import org.apache.inlong.sort.util.CommonUtils;
+
+public class MultiTenancyDeserializer implements DataFlowInfoListener, Deserializer<SerializedRecord, Record> {
+    /**
+     * Date flow id -> Deserializer.
+     */
+    private final Map<Long, Deserializer<SerializedRecord, Record>> deserializers = new HashMap<>();
+
+    @Override
+    public void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+        updateDataFlow(dataFlowInfo);
+    }
+
+    @Override
+    public void updateDataFlow(DataFlowInfo dataFlowInfo) {
+        final DeserializationInfo deserializationInfo = dataFlowInfo.getSourceInfo().getDeserializationInfo();
+
+        final Deserializer<SerializedRecord, Record> deserializer = generateDeserializer(
+                dataFlowInfo.getSourceInfo().getFields(), deserializationInfo);
+
+        deserializers.put(dataFlowInfo.getId(), deserializer);
+    }
+
+    @Override
+    public void removeDataFlow(DataFlowInfo dataFlowInfo) {
+        deserializers.remove(dataFlowInfo.getId());
+    }
+
+    @Override
+    public void deserialize(SerializedRecord record, Collector<Record> collector) throws Exception {
+        final Deserializer<SerializedRecord, Record> deserializer = deserializers.get(record.getDataFlowId());
+        if (deserializer == null) {
+            throw new Exception("No schema found for data flow:" + record.getDataFlowId());
+        }
+        deserializer.deserialize(record, collector);
+    }
+
+    @VisibleForTesting
+    Deserializer<SerializedRecord, Record> generateDeserializer(
+            FieldInfo[] fields,
+            DeserializationInfo deserializationInfo) {
+
+        final RowFormatInfo rowFormatInfo = CommonUtils.generateRowFormatInfo(fields);
+
+        final Deserializer<SerializedRecord, Record> deserializer;
+        if (deserializationInfo instanceof TDMsgCsvDeserializationInfo) {
+            TDMsgCsvDeserializationInfo tdMsgCsvDeserializationInfo = (TDMsgCsvDeserializationInfo) deserializationInfo;
+            TDMsgCsvFormatDeserializer tdMsgCsvFormatDeserializer = new TDMsgCsvFormatDeserializer(
+                    rowFormatInfo,
+                    DEFAULT_TIME_FIELD_NAME,
+                    DEFAULT_ATTRIBUTES_FIELD_NAME,
+                    TableFormatConstants.DEFAULT_CHARSET,
+                    tdMsgCsvDeserializationInfo.getDelimiter(),
+                    null,
+                    null,
+                    null,
+                    tdMsgCsvDeserializationInfo.isDeleteHeadDelimiter(),
+                    TableFormatConstants.DEFAULT_IGNORE_ERRORS);
+            deserializer = new TDMsgDeserializer(tdMsgCsvFormatDeserializer);
+        } else {
+            // TODO, support more formats here
+            throw new UnsupportedOperationException(
+                    "Not supported yet " + deserializationInfo.getClass().getSimpleName());
+        }
+
+        return deserializer;
+    }
+}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializer.java
index 0b5b2b5..080c4f7 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializer.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializer.java
@@ -27,7 +27,7 @@ import java.util.Map;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.flink.util.Collector;
 import org.apache.inlong.sort.flink.Record;
-import org.apache.inlong.sort.flink.TDMsgSerializedRecord;
+import org.apache.inlong.sort.flink.TDMsgMixedSerializedRecord;
 import org.apache.inlong.sort.formats.common.RowFormatInfo;
 import org.apache.inlong.sort.formats.tdmsg.AbstractTDMsgMixedFormatDeserializer;
 import org.apache.inlong.sort.formats.tdmsg.TDMsgMixedFormatConverter;
@@ -39,6 +39,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
 import org.apache.inlong.sort.protocol.deserialization.TDMsgCsvDeserializationInfo;
 import org.apache.inlong.sort.protocol.deserialization.TDMsgDeserializationInfo;
+import org.apache.inlong.sort.protocol.source.PulsarSourceInfo;
 import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
 import org.apache.inlong.sort.util.CommonUtils;
 
@@ -46,7 +47,7 @@ import org.apache.inlong.sort.util.CommonUtils;
  * A deserializer to handle mixed TDMsg records.
  */
 public class MultiTenancyTDMsgMixedDeserializer implements DataFlowInfoListener,
-        Deserializer<TDMsgSerializedRecord, Record> {
+        Deserializer<TDMsgMixedSerializedRecord, Record> {
 
     /**
      * Maps topic to mixed deserializer.
@@ -71,10 +72,17 @@ public class MultiTenancyTDMsgMixedDeserializer implements DataFlowInfoListener,
         final AbstractTDMsgMixedFormatDeserializer preDeserializer = allDeserializer.getLeft();
         final TDMsgMixedFormatConverter deserializer = allDeserializer.getRight();
 
-        // currently only tubeMQ source supports TDMsg format
-        final TubeSourceInfo tubeSourceInfo = (TubeSourceInfo) dataFlowInfo.getSourceInfo();
+        final String topic;
+        if (dataFlowInfo.getSourceInfo() instanceof TubeSourceInfo) {
+            topic = ((TubeSourceInfo) dataFlowInfo.getSourceInfo()).getTopic();
+        } else if (dataFlowInfo.getSourceInfo() instanceof PulsarSourceInfo) {
+            topic = ((PulsarSourceInfo) dataFlowInfo.getSourceInfo()).getTopic();
+        } else {
+            throw new UnsupportedOperationException("Unknown source type " + dataFlowInfo.getSourceInfo());
+        }
+
         final TDMsgMixedDeserializer mixedDeserializer = mixedDeserializerMap
-                .computeIfAbsent(tubeSourceInfo.getTopic(), topic -> new TDMsgMixedDeserializer());
+                 .computeIfAbsent(topic, key -> new TDMsgMixedDeserializer());
         mixedDeserializer.updateDataFlow(
                 dataFlowInfo.getId(), tdMsgDeserializationInfo.getTid(), preDeserializer, deserializer);
     }
@@ -102,7 +110,7 @@ public class MultiTenancyTDMsgMixedDeserializer implements DataFlowInfoListener,
         return deserializationInfo instanceof TDMsgDeserializationInfo;
     }
 
-    public void deserialize(TDMsgSerializedRecord record, Collector<Record> collector) throws Exception {
+    public void deserialize(TDMsgMixedSerializedRecord record, Collector<Record> collector) throws Exception {
         final String topic = record.getTopic();
         final TDMsgMixedDeserializer mixedDeserializer = mixedDeserializerMap.get(topic);
         if (mixedDeserializer == null) {
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgDeserializer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgDeserializer.java
new file mode 100644
index 0000000..bda542e
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgDeserializer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.deserialization;
+
+import org.apache.flink.util.Collector;
+import org.apache.inlong.sort.flink.Record;
+import org.apache.inlong.sort.flink.SerializedRecord;
+import org.apache.inlong.sort.formats.tdmsg.AbstractTDMsgFormatDeserializer;
+
+public class TDMsgDeserializer implements Deserializer<SerializedRecord, Record> {
+    private final AbstractTDMsgFormatDeserializer innerDeserializer;
+
+    public TDMsgDeserializer(AbstractTDMsgFormatDeserializer innerDeserializer) {
+        this.innerDeserializer = innerDeserializer;
+    }
+
+    @Override
+    public void deserialize(SerializedRecord input, Collector<Record> collector) throws Exception {
+        innerDeserializer.flatMap(input.getData(),  new CallbackCollector<>(
+                row -> collector.collect(new Record(
+                        input.getDataFlowId(),
+                        input.getTimestampMillis(),
+                        row))));
+    }
+}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializer.java
index aff45ce..d396cb7 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializer.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializer.java
@@ -24,7 +24,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.flink.util.Collector;
 import org.apache.inlong.sort.flink.Record;
-import org.apache.inlong.sort.flink.TDMsgSerializedRecord;
+import org.apache.inlong.sort.flink.TDMsgMixedSerializedRecord;
 import org.apache.inlong.sort.formats.tdmsg.AbstractTDMsgFormatDeserializer;
 import org.apache.inlong.sort.formats.tdmsg.TDMsgMixedFormatConverter;
 import org.apache.inlong.sort.formats.tdmsg.TDMsgUtils;
@@ -32,7 +32,7 @@ import org.apache.inlong.sort.formats.tdmsg.TDMsgUtils;
 /**
  * A deserializer to handle mixed TDMsg records of one topic.
  */
-public class TDMsgMixedDeserializer implements Deserializer<TDMsgSerializedRecord, Record> {
+public class TDMsgMixedDeserializer implements Deserializer<TDMsgMixedSerializedRecord, Record> {
 
     /**
      * Each topic should have same preDeserializer, so just keep one.
@@ -76,7 +76,7 @@ public class TDMsgMixedDeserializer implements Deserializer<TDMsgSerializedRecor
     }
 
     @Override
-    public void deserialize(TDMsgSerializedRecord tdMsgRecord, Collector<Record> collector) throws Exception {
+    public void deserialize(TDMsgMixedSerializedRecord tdMsgRecord, Collector<Record> collector) throws Exception {
         preDeserializer.flatMap(tdMsgRecord.getData(), new CallbackCollector<>(mixedRow -> {
             final String tid = TDMsgUtils.getTidFromMixedRow(mixedRow);
             final Set<Long> dataFlowIds = interface2DataFlowsMap.get(tid);
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java
index 028a89a..9601541 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java
@@ -42,7 +42,7 @@ import org.apache.flink.util.TimeUtils;
 import org.apache.inlong.sort.configuration.Configuration;
 import org.apache.inlong.sort.configuration.Constants;
 import org.apache.inlong.sort.flink.SerializedRecord;
-import org.apache.inlong.sort.flink.TDMsgSerializedRecord;
+import org.apache.inlong.sort.flink.TDMsgMixedSerializedRecord;
 import org.apache.inlong.sort.meta.MetaManager;
 import org.apache.inlong.sort.util.CommonUtils;
 import org.apache.inlong.tubemq.client.config.ConsumerConfig;
@@ -396,7 +396,7 @@ public class MultiTenancyTubeConsumer {
                 synchronized (context.getCheckpointLock()) {
                     for (Message message : consumeResult.getMessageList()) {
                         // TODO, optimize for single tid or no tid topic
-                        context.collect(new TDMsgSerializedRecord(
+                        context.collect(new TDMsgMixedSerializedRecord(
                                 topic, System.currentTimeMillis(), message.getData()));
                     }
                     final String partitionKey = consumeResult.getPartitionKey();
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java
index 37dddfc..fddb22c 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java
@@ -27,7 +27,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.inlong.commons.msg.TDMsg1;
 import org.apache.inlong.sort.flink.Record;
-import org.apache.inlong.sort.flink.TDMsgSerializedRecord;
+import org.apache.inlong.sort.flink.TDMsgMixedSerializedRecord;
 import org.apache.inlong.sort.formats.common.LongFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.formats.tdmsg.TDMsgUtils;
@@ -85,7 +85,7 @@ public class MultiTenancyTDMsgMixedDeserializerTest extends TestLogger {
         tdMsg1.addMsg(attrs, body1.getBytes());
 
         final TestingCollector<Record> collector = new TestingCollector<>();
-        deserializer.deserialize(new TDMsgSerializedRecord("topic", 0, tdMsg1.buildArray()), collector);
+        deserializer.deserialize(new TDMsgMixedSerializedRecord("topic", 0, tdMsg1.buildArray()), collector);
 
         assertEquals(1, collector.results.size());
         assertEquals(1L, collector.results.get(0).getDataflowId());
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializerTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializerTest.java
index 8380f3d..7ed9c9b 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializerTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializerTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Collector;
 import org.apache.inlong.sort.flink.Record;
-import org.apache.inlong.sort.flink.TDMsgSerializedRecord;
+import org.apache.inlong.sort.flink.TDMsgMixedSerializedRecord;
 import org.apache.inlong.sort.formats.tdmsg.AbstractTDMsgFormatDeserializer;
 import org.apache.inlong.sort.formats.tdmsg.TDMsgBody;
 import org.apache.inlong.sort.formats.tdmsg.TDMsgHead;
@@ -93,7 +93,7 @@ public class TDMsgMixedDeserializerTest extends TestLogger {
         row.setField(2, tid);
         preDeserializer.records.add(row);
 
-        mixedDeserializer.deserialize(new TDMsgSerializedRecord(), collector);
+        mixedDeserializer.deserialize(new TDMsgMixedSerializedRecord(), collector);
         assertEquals(2, collector.results.size());
         assertEquals(dataFlowId1, collector.results.get(0).getDataflowId());
         assertEquals(tid, collector.results.get(0).getRow().getField(2));