You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/12/21 02:03:15 UTC
[incubator-inlong] branch master updated: [INLONG-2038][Bug]inlong-sort abandon data from pulsar due to an ClassCastException (#2041)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 7dd56ed [INLONG-2038][Bug]inlong-sort abandon data from pulsar due to an ClassCastException (#2041)
7dd56ed is described below
commit 7dd56ed0e030e8dc280a741254fa3cba93b94051
Author: TianqiWan <52...@users.noreply.github.com>
AuthorDate: Tue Dec 21 10:02:42 2021 +0800
[INLONG-2038][Bug]inlong-sort abandon data from pulsar due to an ClassCastException (#2041)
Co-authored-by: tianqiwan <ti...@tencent.com>
---
...Record.java => TDMsgMixedSerializedRecord.java} | 6 +-
.../deserialization/DeserializationSchema.java | 68 ++++++--------
.../deserialization/MultiTenancyDeserializer.java | 104 +++++++++++++++++++++
.../MultiTenancyTDMsgMixedDeserializer.java | 20 ++--
.../flink/deserialization/TDMsgDeserializer.java | 40 ++++++++
.../deserialization/TDMsgMixedDeserializer.java | 6 +-
.../flink/tubemq/MultiTenancyTubeConsumer.java | 4 +-
.../MultiTenancyTDMsgMixedDeserializerTest.java | 4 +-
.../TDMsgMixedDeserializerTest.java | 4 +-
9 files changed, 200 insertions(+), 56 deletions(-)
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgSerializedRecord.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgMixedSerializedRecord.java
similarity index 87%
rename from inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgSerializedRecord.java
rename to inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgMixedSerializedRecord.java
index 37feba0..52164dc 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgSerializedRecord.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgMixedSerializedRecord.java
@@ -22,7 +22,7 @@ import static org.apache.inlong.sort.configuration.Constants.UNKNOWN_DATAFLOW_ID
/**
* Data flow id might not been got from mixed TDMsg data stream.
*/
-public class TDMsgSerializedRecord extends SerializedRecord {
+public class TDMsgMixedSerializedRecord extends SerializedRecord {
private static final long serialVersionUID = 4075321919886376829L;
@@ -31,11 +31,11 @@ public class TDMsgSerializedRecord extends SerializedRecord {
/**
* Just satisfy requirement of Flink Pojo definition.
*/
- public TDMsgSerializedRecord() {
+ public TDMsgMixedSerializedRecord() {
super();
}
- public TDMsgSerializedRecord(String topic, long timestampMillis, byte[] data) {
+ public TDMsgMixedSerializedRecord(String topic, long timestampMillis, byte[] data) {
super(UNKNOWN_DATAFLOW_ID, timestampMillis, data);
this.topic = topic;
}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/DeserializationSchema.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/DeserializationSchema.java
index ae194bc..5827ef2 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/DeserializationSchema.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/DeserializationSchema.java
@@ -17,8 +17,6 @@
package org.apache.inlong.sort.flink.deserialization;
-import static org.apache.inlong.sort.configuration.Constants.UNKNOWN_DATAFLOW_ID;
-
import com.google.common.base.Preconditions;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
@@ -26,7 +24,7 @@ import org.apache.inlong.sort.configuration.Configuration;
import org.apache.inlong.sort.configuration.Constants;
import org.apache.inlong.sort.flink.Record;
import org.apache.inlong.sort.flink.SerializedRecord;
-import org.apache.inlong.sort.flink.TDMsgSerializedRecord;
+import org.apache.inlong.sort.flink.TDMsgMixedSerializedRecord;
import org.apache.inlong.sort.flink.metrics.MetricData;
import org.apache.inlong.sort.flink.metrics.MetricData.MetricSource;
import org.apache.inlong.sort.flink.metrics.MetricData.MetricType;
@@ -58,6 +56,8 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
private transient MultiTenancyTDMsgMixedDeserializer multiTenancyTdMsgMixedDeserializer;
+ private transient MultiTenancyDeserializer multiTenancyDeserializer;
+
private transient MetaManager metaManager;
private transient Boolean enableOutputMetrics;
@@ -70,6 +70,7 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
schemaLock = new Object();
multiTenancyTdMsgMixedDeserializer = new MultiTenancyTDMsgMixedDeserializer();
+ multiTenancyDeserializer = new MultiTenancyDeserializer();
fieldMappingTransformer = new FieldMappingTransformer();
recordTransformer = new RecordTransformer(config.getInteger(Constants.ETL_RECORD_SERIALIZATION_BUFFER_SIZE));
metaManager = MetaManager.getInstance(config);
@@ -105,46 +106,34 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
context.output(METRIC_DATA_OUTPUT_TAG, metricData);
}
- if (serializedRecord instanceof TDMsgSerializedRecord
- && serializedRecord.getDataFlowId() == UNKNOWN_DATAFLOW_ID) {
- final TDMsgSerializedRecord tdmsgRecord = (TDMsgSerializedRecord) serializedRecord;
- synchronized (schemaLock) {
- multiTenancyTdMsgMixedDeserializer.deserialize(
- tdmsgRecord,
- new CallbackCollector<>(sourceRecord -> {
- final Record sinkRecord = fieldMappingTransformer.transform(sourceRecord);
-
- if (enableOutputMetrics) {
- MetricData metricData = new MetricData(
- // TODO, outputs this metric in Sink Function
- MetricSource.SINK,
- MetricType.SUCCESSFUL,
- sinkRecord.getTimestampMillis(),
- sinkRecord.getDataflowId(),
- "",
- 1);
-
- context.output(METRIC_DATA_OUTPUT_TAG, metricData);
- }
-
- collector.collect(recordTransformer.toSerializedRecord(sinkRecord));
- }));
- }
- } else {
- // TODO, support more data types
- if (enableOutputMetrics
- && !config.getString(Constants.SOURCE_TYPE).equals(Constants.SOURCE_TYPE_TUBE)) {
+ final CallbackCollector<Record> transformCollector = new CallbackCollector<>(sourceRecord -> {
+ final Record sinkRecord = fieldMappingTransformer.transform(sourceRecord);
+
+ if (enableOutputMetrics) {
MetricData metricData = new MetricData(
- MetricSource.DESERIALIZATION,
- MetricType.ABANDONED,
- serializedRecord.getTimestampMillis(),
- serializedRecord.getDataFlowId(),
- "Unsupported schema",
+ // TODO, outputs this metric in Sink Function
+ MetricSource.SINK,
+ MetricType.SUCCESSFUL,
+ sinkRecord.getTimestampMillis(),
+ sinkRecord.getDataflowId(),
+ "",
1);
+
context.output(METRIC_DATA_OUTPUT_TAG, metricData);
}
- LOG.warn("Abandon data due to unsupported record {}", serializedRecord);
+ collector.collect(recordTransformer.toSerializedRecord(sinkRecord));
+ });
+
+ if (serializedRecord instanceof TDMsgMixedSerializedRecord) {
+ final TDMsgMixedSerializedRecord tdmsgRecord = (TDMsgMixedSerializedRecord) serializedRecord;
+ synchronized (schemaLock) {
+ multiTenancyTdMsgMixedDeserializer.deserialize(tdmsgRecord, transformCollector);
+ }
+ } else {
+ synchronized (schemaLock) {
+ multiTenancyDeserializer.deserialize(serializedRecord, transformCollector);
+ }
}
} catch (Exception e) {
if (enableOutputMetrics
@@ -169,6 +158,7 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
public void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
synchronized (schemaLock) {
multiTenancyTdMsgMixedDeserializer.addDataFlow(dataFlowInfo);
+ multiTenancyDeserializer.addDataFlow(dataFlowInfo);
fieldMappingTransformer.addDataFlow(dataFlowInfo);
recordTransformer.addDataFlow(dataFlowInfo);
}
@@ -178,6 +168,7 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
public void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
synchronized (schemaLock) {
multiTenancyTdMsgMixedDeserializer.updateDataFlow(dataFlowInfo);
+ multiTenancyDeserializer.updateDataFlow(dataFlowInfo);
fieldMappingTransformer.updateDataFlow(dataFlowInfo);
recordTransformer.updateDataFlow(dataFlowInfo);
}
@@ -187,6 +178,7 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
public void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
synchronized (schemaLock) {
multiTenancyTdMsgMixedDeserializer.removeDataFlow(dataFlowInfo);
+ multiTenancyDeserializer.removeDataFlow(dataFlowInfo);
fieldMappingTransformer.removeDataFlow(dataFlowInfo);
recordTransformer.removeDataFlow(dataFlowInfo);
}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyDeserializer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyDeserializer.java
new file mode 100644
index 0000000..3e75aa5
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyDeserializer.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.deserialization;
+
+import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static org.apache.inlong.sort.formats.tdmsg.TDMsgUtils.DEFAULT_TIME_FIELD_NAME;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.util.Collector;
+import org.apache.inlong.sort.flink.Record;
+import org.apache.inlong.sort.flink.SerializedRecord;
+import org.apache.inlong.sort.formats.base.TableFormatConstants;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.tdmsgcsv.TDMsgCsvFormatDeserializer;
+import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.TDMsgCsvDeserializationInfo;
+import org.apache.inlong.sort.util.CommonUtils;
+
+public class MultiTenancyDeserializer implements DataFlowInfoListener, Deserializer<SerializedRecord, Record> {
+ /**
+ * Date flow id -> Deserializer.
+ */
+ private final Map<Long, Deserializer<SerializedRecord, Record>> deserializers = new HashMap<>();
+
+ @Override
+ public void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+ updateDataFlow(dataFlowInfo);
+ }
+
+ @Override
+ public void updateDataFlow(DataFlowInfo dataFlowInfo) {
+ final DeserializationInfo deserializationInfo = dataFlowInfo.getSourceInfo().getDeserializationInfo();
+
+ final Deserializer<SerializedRecord, Record> deserializer = generateDeserializer(
+ dataFlowInfo.getSourceInfo().getFields(), deserializationInfo);
+
+ deserializers.put(dataFlowInfo.getId(), deserializer);
+ }
+
+ @Override
+ public void removeDataFlow(DataFlowInfo dataFlowInfo) {
+ deserializers.remove(dataFlowInfo.getId());
+ }
+
+ @Override
+ public void deserialize(SerializedRecord record, Collector<Record> collector) throws Exception {
+ final Deserializer<SerializedRecord, Record> deserializer = deserializers.get(record.getDataFlowId());
+ if (deserializer == null) {
+ throw new Exception("No schema found for data flow:" + record.getDataFlowId());
+ }
+ deserializer.deserialize(record, collector);
+ }
+
+ @VisibleForTesting
+ Deserializer<SerializedRecord, Record> generateDeserializer(
+ FieldInfo[] fields,
+ DeserializationInfo deserializationInfo) {
+
+ final RowFormatInfo rowFormatInfo = CommonUtils.generateRowFormatInfo(fields);
+
+ final Deserializer<SerializedRecord, Record> deserializer;
+ if (deserializationInfo instanceof TDMsgCsvDeserializationInfo) {
+ TDMsgCsvDeserializationInfo tdMsgCsvDeserializationInfo = (TDMsgCsvDeserializationInfo) deserializationInfo;
+ TDMsgCsvFormatDeserializer tdMsgCsvFormatDeserializer = new TDMsgCsvFormatDeserializer(
+ rowFormatInfo,
+ DEFAULT_TIME_FIELD_NAME,
+ DEFAULT_ATTRIBUTES_FIELD_NAME,
+ TableFormatConstants.DEFAULT_CHARSET,
+ tdMsgCsvDeserializationInfo.getDelimiter(),
+ null,
+ null,
+ null,
+ tdMsgCsvDeserializationInfo.isDeleteHeadDelimiter(),
+ TableFormatConstants.DEFAULT_IGNORE_ERRORS);
+ deserializer = new TDMsgDeserializer(tdMsgCsvFormatDeserializer);
+ } else {
+ // TODO, support more formats here
+ throw new UnsupportedOperationException(
+ "Not supported yet " + deserializationInfo.getClass().getSimpleName());
+ }
+
+ return deserializer;
+ }
+}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializer.java
index 0b5b2b5..080c4f7 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializer.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializer.java
@@ -27,7 +27,7 @@ import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.util.Collector;
import org.apache.inlong.sort.flink.Record;
-import org.apache.inlong.sort.flink.TDMsgSerializedRecord;
+import org.apache.inlong.sort.flink.TDMsgMixedSerializedRecord;
import org.apache.inlong.sort.formats.common.RowFormatInfo;
import org.apache.inlong.sort.formats.tdmsg.AbstractTDMsgMixedFormatDeserializer;
import org.apache.inlong.sort.formats.tdmsg.TDMsgMixedFormatConverter;
@@ -39,6 +39,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
import org.apache.inlong.sort.protocol.deserialization.TDMsgCsvDeserializationInfo;
import org.apache.inlong.sort.protocol.deserialization.TDMsgDeserializationInfo;
+import org.apache.inlong.sort.protocol.source.PulsarSourceInfo;
import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
import org.apache.inlong.sort.util.CommonUtils;
@@ -46,7 +47,7 @@ import org.apache.inlong.sort.util.CommonUtils;
* A deserializer to handle mixed TDMsg records.
*/
public class MultiTenancyTDMsgMixedDeserializer implements DataFlowInfoListener,
- Deserializer<TDMsgSerializedRecord, Record> {
+ Deserializer<TDMsgMixedSerializedRecord, Record> {
/**
* Maps topic to mixed deserializer.
@@ -71,10 +72,17 @@ public class MultiTenancyTDMsgMixedDeserializer implements DataFlowInfoListener,
final AbstractTDMsgMixedFormatDeserializer preDeserializer = allDeserializer.getLeft();
final TDMsgMixedFormatConverter deserializer = allDeserializer.getRight();
- // currently only tubeMQ source supports TDMsg format
- final TubeSourceInfo tubeSourceInfo = (TubeSourceInfo) dataFlowInfo.getSourceInfo();
+ final String topic;
+ if (dataFlowInfo.getSourceInfo() instanceof TubeSourceInfo) {
+ topic = ((TubeSourceInfo) dataFlowInfo.getSourceInfo()).getTopic();
+ } else if (dataFlowInfo.getSourceInfo() instanceof PulsarSourceInfo) {
+ topic = ((PulsarSourceInfo) dataFlowInfo.getSourceInfo()).getTopic();
+ } else {
+ throw new UnsupportedOperationException("Unknown source type " + dataFlowInfo.getSourceInfo());
+ }
+
final TDMsgMixedDeserializer mixedDeserializer = mixedDeserializerMap
- .computeIfAbsent(tubeSourceInfo.getTopic(), topic -> new TDMsgMixedDeserializer());
+ .computeIfAbsent(topic, key -> new TDMsgMixedDeserializer());
mixedDeserializer.updateDataFlow(
dataFlowInfo.getId(), tdMsgDeserializationInfo.getTid(), preDeserializer, deserializer);
}
@@ -102,7 +110,7 @@ public class MultiTenancyTDMsgMixedDeserializer implements DataFlowInfoListener,
return deserializationInfo instanceof TDMsgDeserializationInfo;
}
- public void deserialize(TDMsgSerializedRecord record, Collector<Record> collector) throws Exception {
+ public void deserialize(TDMsgMixedSerializedRecord record, Collector<Record> collector) throws Exception {
final String topic = record.getTopic();
final TDMsgMixedDeserializer mixedDeserializer = mixedDeserializerMap.get(topic);
if (mixedDeserializer == null) {
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgDeserializer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgDeserializer.java
new file mode 100644
index 0000000..bda542e
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgDeserializer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.deserialization;
+
+import org.apache.flink.util.Collector;
+import org.apache.inlong.sort.flink.Record;
+import org.apache.inlong.sort.flink.SerializedRecord;
+import org.apache.inlong.sort.formats.tdmsg.AbstractTDMsgFormatDeserializer;
+
+public class TDMsgDeserializer implements Deserializer<SerializedRecord, Record> {
+ private final AbstractTDMsgFormatDeserializer innerDeserializer;
+
+ public TDMsgDeserializer(AbstractTDMsgFormatDeserializer innerDeserializer) {
+ this.innerDeserializer = innerDeserializer;
+ }
+
+ @Override
+ public void deserialize(SerializedRecord input, Collector<Record> collector) throws Exception {
+ innerDeserializer.flatMap(input.getData(), new CallbackCollector<>(
+ row -> collector.collect(new Record(
+ input.getDataFlowId(),
+ input.getTimestampMillis(),
+ row))));
+ }
+}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializer.java
index aff45ce..d396cb7 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializer.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializer.java
@@ -24,7 +24,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.flink.util.Collector;
import org.apache.inlong.sort.flink.Record;
-import org.apache.inlong.sort.flink.TDMsgSerializedRecord;
+import org.apache.inlong.sort.flink.TDMsgMixedSerializedRecord;
import org.apache.inlong.sort.formats.tdmsg.AbstractTDMsgFormatDeserializer;
import org.apache.inlong.sort.formats.tdmsg.TDMsgMixedFormatConverter;
import org.apache.inlong.sort.formats.tdmsg.TDMsgUtils;
@@ -32,7 +32,7 @@ import org.apache.inlong.sort.formats.tdmsg.TDMsgUtils;
/**
* A deserializer to handle mixed TDMsg records of one topic.
*/
-public class TDMsgMixedDeserializer implements Deserializer<TDMsgSerializedRecord, Record> {
+public class TDMsgMixedDeserializer implements Deserializer<TDMsgMixedSerializedRecord, Record> {
/**
* Each topic should have same preDeserializer, so just keep one.
@@ -76,7 +76,7 @@ public class TDMsgMixedDeserializer implements Deserializer<TDMsgSerializedRecor
}
@Override
- public void deserialize(TDMsgSerializedRecord tdMsgRecord, Collector<Record> collector) throws Exception {
+ public void deserialize(TDMsgMixedSerializedRecord tdMsgRecord, Collector<Record> collector) throws Exception {
preDeserializer.flatMap(tdMsgRecord.getData(), new CallbackCollector<>(mixedRow -> {
final String tid = TDMsgUtils.getTidFromMixedRow(mixedRow);
final Set<Long> dataFlowIds = interface2DataFlowsMap.get(tid);
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java
index 028a89a..9601541 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java
@@ -42,7 +42,7 @@ import org.apache.flink.util.TimeUtils;
import org.apache.inlong.sort.configuration.Configuration;
import org.apache.inlong.sort.configuration.Constants;
import org.apache.inlong.sort.flink.SerializedRecord;
-import org.apache.inlong.sort.flink.TDMsgSerializedRecord;
+import org.apache.inlong.sort.flink.TDMsgMixedSerializedRecord;
import org.apache.inlong.sort.meta.MetaManager;
import org.apache.inlong.sort.util.CommonUtils;
import org.apache.inlong.tubemq.client.config.ConsumerConfig;
@@ -396,7 +396,7 @@ public class MultiTenancyTubeConsumer {
synchronized (context.getCheckpointLock()) {
for (Message message : consumeResult.getMessageList()) {
// TODO, optimize for single tid or no tid topic
- context.collect(new TDMsgSerializedRecord(
+ context.collect(new TDMsgMixedSerializedRecord(
topic, System.currentTimeMillis(), message.getData()));
}
final String partitionKey = consumeResult.getPartitionKey();
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java
index 37dddfc..fddb22c 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java
@@ -27,7 +27,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.inlong.commons.msg.TDMsg1;
import org.apache.inlong.sort.flink.Record;
-import org.apache.inlong.sort.flink.TDMsgSerializedRecord;
+import org.apache.inlong.sort.flink.TDMsgMixedSerializedRecord;
import org.apache.inlong.sort.formats.common.LongFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.formats.tdmsg.TDMsgUtils;
@@ -85,7 +85,7 @@ public class MultiTenancyTDMsgMixedDeserializerTest extends TestLogger {
tdMsg1.addMsg(attrs, body1.getBytes());
final TestingCollector<Record> collector = new TestingCollector<>();
- deserializer.deserialize(new TDMsgSerializedRecord("topic", 0, tdMsg1.buildArray()), collector);
+ deserializer.deserialize(new TDMsgMixedSerializedRecord("topic", 0, tdMsg1.buildArray()), collector);
assertEquals(1, collector.results.size());
assertEquals(1L, collector.results.get(0).getDataflowId());
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializerTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializerTest.java
index 8380f3d..7ed9c9b 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializerTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializerTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.inlong.sort.flink.Record;
-import org.apache.inlong.sort.flink.TDMsgSerializedRecord;
+import org.apache.inlong.sort.flink.TDMsgMixedSerializedRecord;
import org.apache.inlong.sort.formats.tdmsg.AbstractTDMsgFormatDeserializer;
import org.apache.inlong.sort.formats.tdmsg.TDMsgBody;
import org.apache.inlong.sort.formats.tdmsg.TDMsgHead;
@@ -93,7 +93,7 @@ public class TDMsgMixedDeserializerTest extends TestLogger {
row.setField(2, tid);
preDeserializer.records.add(row);
- mixedDeserializer.deserialize(new TDMsgSerializedRecord(), collector);
+ mixedDeserializer.deserialize(new TDMsgMixedSerializedRecord(), collector);
assertEquals(2, collector.results.size());
assertEquals(dataFlowId1, collector.results.get(0).getDataflowId());
assertEquals(tid, collector.results.get(0).getRow().getField(2));