You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/08/23 09:12:05 UTC

[incubator-inlong] branch master updated: [INLONG-746] Support metric system in inlong-sort (#1335) (#1461)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a3c3836  [INLONG-746] Support metric system in inlong-sort (#1335) (#1461)
a3c3836 is described below

commit a3c38365fb29bcff27dc328a9701a3585a57f10a
Author: chantccc <52...@users.noreply.github.com>
AuthorDate: Mon Aug 23 17:11:59 2021 +0800

    [INLONG-746] Support metric system in inlong-sort (#1335) (#1461)
    
    Co-authored-by: tianqiwan <ti...@tencent.com>
---
 .../inlong/sort/configuration/Constants.java       |  34 ++++
 .../org/apache/inlong/sort/flink/Entrance.java     |  43 +++++
 .../java/org/apache/inlong/sort/flink/Record.java  |  18 ++-
 .../apache/inlong/sort/flink/SerializedRecord.java |  14 +-
 .../inlong/sort/flink/TDMsgSerializedRecord.java   |   4 +-
 .../deserialization/DeserializationSchema.java     |  67 +++++++-
 .../deserialization/TDMsgMixedDeserializer.java    |   2 +-
 .../inlong/sort/flink/metrics/MetricData.java      | 176 +++++++++++++++++++++
 .../sort/flink/metrics/MetricsAggregator.java      | 118 ++++++++++++++
 .../MetricsAssignerWithPeriodicWatermarks.java}    |  41 ++---
 .../inlong/sort/flink/metrics/MetricsLogSink.java  |  71 +++++++++
 .../transformation/FieldMappingTransformer.java    |  15 +-
 .../flink/transformation/RecordTransformer.java    |   5 +-
 .../flink/tubemq/MultiTenancyTubeConsumer.java     |   3 +-
 .../MultiTenancyTDMsgMixedDeserializerTest.java    |   2 +-
 .../inlong/sort/flink/hive/HiveSinkITCase.java     |   3 +-
 .../inlong/sort/flink/metrics/MetricDataTest.java} |  44 ++----
 .../sort/flink/metrics/MetricsLogSinkTest.java     |  58 +++++++
 .../FieldMappingTransformerTest.java               |   4 +-
 .../transformation/RecordTransformerTest.java      |  12 +-
 20 files changed, 655 insertions(+), 79 deletions(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
index 7ecbae2..aa9391e 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
@@ -32,6 +32,8 @@ public class Constants {
 
     public static final String SINK_TYPE_HIVE = "hive";
 
+    public static final String METRIC_DATA_OUTPUT_TAG_ID = "metric_data_side_output";
+
     // ------------------------------------------------------------------------
     //  Operator uid
     // ------------------------------------------------------------------------
@@ -241,4 +243,36 @@ public class Constants {
     public static final ConfigOption<Integer> CHECKPOINT_TIMEOUT_MS =
         key("checkpoint.timeout.ms")
             .defaultValue(600000);
+
+    // ------------------------------------------------------------------------
+    //  Metrics related
+    // ------------------------------------------------------------------------
+    public static final ConfigOption<Boolean> METRICS_ENABLE_OUTPUT =
+        key("metrics.enable.output")
+            .defaultValue(true);
+
+    public static final ConfigOption<Integer> METRICS_TIMESTAMP_WATERMARK_ASSIGNER_PARALLELISM =
+        key("metrics.timestamp.watermark.assigner.parallelism")
+            .defaultValue(1);
+
+    public static final ConfigOption<Integer> METRICS_AGGREGATOR_PARALLELISM =
+        key("metrics.aggregator.parallelism")
+            .defaultValue(1);
+
+    public static final ConfigOption<Integer> METRICS_SINK_PARALLELISM =
+        key("metrics.sink.parallelism")
+            .defaultValue(1)
+            .withDeprecatedKeys("metrics.mysql.sink.parallelism");
+
+    public static final String METRICS_TIMESTAMP_AND_WATERMARK_ASSIGNER_UID
+        = "metrics_timestamp_and_watermark_assigner_uid";
+
+    public static final String METRICS_AGGREGATOR_UID = "metrics_aggregator_uid";
+
+    public static final String METRICS_SINK_UID = "metrics_sink_uid";
+
+    public static final ConfigOption<Integer> METRICS_AGGREGATOR_WINDOW_SIZE =
+        key("metrics.aggregator.window.size")
+            .defaultValue(5)
+            .withDescription("minutes");
 }
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/Entrance.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/Entrance.java
index e7ee47b..2d5c39b 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/Entrance.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/Entrance.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.flink;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.inlong.sort.configuration.Constants.SINK_TYPE_HIVE;
 
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -30,8 +31,16 @@ import org.apache.inlong.sort.flink.clickhouse.ClickHouseMultiSinkFunction;
 import org.apache.inlong.sort.flink.deserialization.DeserializationSchema;
 import org.apache.inlong.sort.flink.hive.HiveMultiTenantCommitter;
 import org.apache.inlong.sort.flink.hive.HiveMultiTenantWriter;
+import org.apache.inlong.sort.flink.metrics.MetricData;
+import org.apache.inlong.sort.flink.metrics.MetricsAggregator.MetricsAggregateFunction;
+import org.apache.inlong.sort.flink.metrics.MetricsAggregator.MetricsProcessWindowFunction;
+import org.apache.inlong.sort.flink.metrics.MetricsAssignerWithPeriodicWatermarks;
+import org.apache.inlong.sort.flink.metrics.MetricsLogSink;
 import org.apache.inlong.sort.flink.tubemq.MultiTopicTubeSourceFunction;
 import org.apache.inlong.sort.util.ParameterTool;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.OutputTag;
 
 public class Entrance {
 
@@ -58,6 +67,7 @@ public class Entrance {
         env.getCheckpointConfig().setCheckpointTimeout(config.getInteger(Constants.CHECKPOINT_TIMEOUT_MS));
         env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
 
+        // Data stream
         DataStream<SerializedRecord> sourceStream;
         if (sourceType.equals(Constants.SOURCE_TYPE_TUBE)) {
             sourceStream = env
@@ -95,6 +105,39 @@ public class Entrance {
             throw new IllegalArgumentException("Unsupported sink type " + sinkType);
         }
 
+        // Metric stream
+        final boolean enableOutputMetrics = config.getBoolean(Constants.METRICS_ENABLE_OUTPUT);
+        if (enableOutputMetrics) {
+            final int metricsAggregatorParallelism = config.getInteger(Constants.METRICS_AGGREGATOR_PARALLELISM);
+            final int metricsTimestampWatermarkAssignerParallelism = config
+                    .getInteger(Constants.METRICS_TIMESTAMP_WATERMARK_ASSIGNER_PARALLELISM);
+            final int metricsMySQLSinkParallelism = config.getInteger(Constants.METRICS_SINK_PARALLELISM);
+            final OutputTag<MetricData> outputTag = new OutputTag<MetricData>(Constants.METRIC_DATA_OUTPUT_TAG_ID) {};
+
+            final DataStream<MetricData> metricsDataStream = deserializationStream
+                    .getSideOutput(outputTag)
+                    .assignTimestampsAndWatermarks(new MetricsAssignerWithPeriodicWatermarks())
+                    .setParallelism(metricsTimestampWatermarkAssignerParallelism)
+                    .uid(Constants.METRICS_TIMESTAMP_AND_WATERMARK_ASSIGNER_UID)
+                    .name("Metrics timestamp/watermark assigner");
+
+            final DataStream<MetricData> metricsAggregatorStream = metricsDataStream
+                    .keyBy((KeySelector<MetricData, String>) MetricData::getKey)
+                    .window(TumblingEventTimeWindows.of(
+                            Time.minutes(config.getInteger(Constants.METRICS_AGGREGATOR_WINDOW_SIZE))))
+                    .allowedLateness(Time.milliseconds(Long.MAX_VALUE))
+                    .aggregate(new MetricsAggregateFunction(), new MetricsProcessWindowFunction())
+                    .setParallelism(metricsAggregatorParallelism)
+                    .uid(Constants.METRICS_AGGREGATOR_UID)
+                    .name("Metrics aggregator");
+
+            metricsAggregatorStream
+                .addSink(new MetricsLogSink())
+                .setParallelism(metricsMySQLSinkParallelism)
+                .uid(Constants.METRICS_SINK_UID)
+                .name("Metrics sink");
+        }
+
         env.execute(clusterId);
     }
 }
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/Record.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/Record.java
index 3b8707c..e5e16ba 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/Record.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/Record.java
@@ -32,6 +32,9 @@ public class Record implements Serializable {
 
     private long dataflowId;
 
+    // Event time
+    private long timestampMillis;
+
     private Row row;
 
     /**
@@ -41,8 +44,9 @@ public class Record implements Serializable {
 
     }
 
-    public Record(long dataflowId, Row row) {
+    public Record(long dataflowId, long timestampMillis, Row row) {
         this.dataflowId = dataflowId;
+        this.timestampMillis = timestampMillis;
         this.row = checkNotNull(row);
     }
 
@@ -54,6 +58,14 @@ public class Record implements Serializable {
         this.dataflowId = dataflowId;
     }
 
+    public long getTimestampMillis() {
+        return timestampMillis;
+    }
+
+    public void setTimestampMillis(long timestampMillis) {
+        this.timestampMillis = timestampMillis;
+    }
+
     public Row getRow() {
         return row;
     }
@@ -73,6 +85,8 @@ public class Record implements Serializable {
         }
 
         Record that = (Record) o;
-        return dataflowId == ((Record) o).dataflowId && Objects.equals(row, that.row);
+        return dataflowId == that.dataflowId
+                && Objects.equals(row, that.row)
+                && timestampMillis == that.timestampMillis;
     }
 }
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/SerializedRecord.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/SerializedRecord.java
index 1e8ad87..bb471f4 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/SerializedRecord.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/SerializedRecord.java
@@ -25,6 +25,9 @@ public class SerializedRecord implements Serializable {
 
     private long dataFlowId;
 
+    // Event time
+    private long timestampMillis;
+
     private byte[] data;
 
     /**
@@ -34,8 +37,9 @@ public class SerializedRecord implements Serializable {
 
     }
 
-    public SerializedRecord(long dataFlowId, byte[] data) {
+    public SerializedRecord(long dataFlowId, long timestampMillis, byte[] data) {
         this.dataFlowId = dataFlowId;
+        this.timestampMillis = timestampMillis;
         this.data = data;
     }
 
@@ -43,6 +47,10 @@ public class SerializedRecord implements Serializable {
         this.dataFlowId = dataFlowId;
     }
 
+    public void setTimestampMillis(long timestampMillis) {
+        this.timestampMillis = timestampMillis;
+    }
+
     public void setData(byte[] data) {
         this.data = data;
     }
@@ -51,6 +59,10 @@ public class SerializedRecord implements Serializable {
         return dataFlowId;
     }
 
+    public long getTimestampMillis() {
+        return timestampMillis;
+    }
+
     public byte[] getData() {
         return data;
     }
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgSerializedRecord.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgSerializedRecord.java
index 3928d76..37feba0 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgSerializedRecord.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgSerializedRecord.java
@@ -35,8 +35,8 @@ public class TDMsgSerializedRecord extends SerializedRecord {
         super();
     }
 
-    public TDMsgSerializedRecord(String topic, byte[] data) {
-        super(UNKNOWN_DATAFLOW_ID, data);
+    public TDMsgSerializedRecord(String topic, long timestampMillis, byte[] data) {
+        super(UNKNOWN_DATAFLOW_ID, timestampMillis, data);
         this.topic = topic;
     }
 
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/DeserializationSchema.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/DeserializationSchema.java
index eb8e0e4..ae194bc 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/DeserializationSchema.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/DeserializationSchema.java
@@ -27,18 +27,27 @@ import org.apache.inlong.sort.configuration.Constants;
 import org.apache.inlong.sort.flink.Record;
 import org.apache.inlong.sort.flink.SerializedRecord;
 import org.apache.inlong.sort.flink.TDMsgSerializedRecord;
+import org.apache.inlong.sort.flink.metrics.MetricData;
+import org.apache.inlong.sort.flink.metrics.MetricData.MetricSource;
+import org.apache.inlong.sort.flink.metrics.MetricData.MetricType;
 import org.apache.inlong.sort.flink.transformation.FieldMappingTransformer;
 import org.apache.inlong.sort.flink.transformation.RecordTransformer;
 import org.apache.inlong.sort.meta.MetaManager;
 import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
 import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.apache.flink.util.OutputTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class DeserializationSchema extends ProcessFunction<SerializedRecord, SerializedRecord> {
 
+    private static final long serialVersionUID = 5380421870587560943L;
+
     private static final Logger LOG = LoggerFactory.getLogger(DeserializationSchema.class);
 
+    private static final OutputTag<MetricData> METRIC_DATA_OUTPUT_TAG
+            = new OutputTag<MetricData>(Constants.METRIC_DATA_OUTPUT_TAG_ID) {};
+
     private transient RecordTransformer recordTransformer;
 
     private transient FieldMappingTransformer fieldMappingTransformer;
@@ -51,6 +60,8 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
 
     private transient MetaManager metaManager;
 
+    private transient Boolean enableOutputMetrics;
+
     public DeserializationSchema(Configuration config) {
         this.config = Preconditions.checkNotNull(config);
     }
@@ -63,6 +74,7 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
         recordTransformer = new RecordTransformer(config.getInteger(Constants.ETL_RECORD_SERIALIZATION_BUFFER_SIZE));
         metaManager = MetaManager.getInstance(config);
         metaManager.registerDataFlowInfoListener(new DataFlowInfoListenerImpl());
+        enableOutputMetrics = config.getBoolean(Constants.METRICS_ENABLE_OUTPUT);
     }
 
     @Override
@@ -79,6 +91,20 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
             Context context,
             Collector<SerializedRecord> collector) throws Exception {
         try {
+            if (enableOutputMetrics
+                    && !config.getString(Constants.SOURCE_TYPE).equals(Constants.SOURCE_TYPE_TUBE)) {
+                // If source is tube, we do not output metrics of package number
+                final MetricData metricData = new MetricData(
+                        // since source could not have side-outputs, so it outputs metrics for source here
+                        MetricSource.SOURCE,
+                        MetricType.SUCCESSFUL,
+                        serializedRecord.getTimestampMillis(),
+                        serializedRecord.getDataFlowId(),
+                        "",
+                        1L);
+                context.output(METRIC_DATA_OUTPUT_TAG, metricData);
+            }
+
             if (serializedRecord instanceof TDMsgSerializedRecord
                     && serializedRecord.getDataFlowId() == UNKNOWN_DATAFLOW_ID) {
                 final TDMsgSerializedRecord tdmsgRecord = (TDMsgSerializedRecord) serializedRecord;
@@ -87,15 +113,52 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
                             tdmsgRecord,
                             new CallbackCollector<>(sourceRecord -> {
                                 final Record sinkRecord = fieldMappingTransformer.transform(sourceRecord);
+
+                                if (enableOutputMetrics) {
+                                    MetricData metricData = new MetricData(
+                                            // TODO, outputs this metric in Sink Function
+                                            MetricSource.SINK,
+                                            MetricType.SUCCESSFUL,
+                                            sinkRecord.getTimestampMillis(),
+                                            sinkRecord.getDataflowId(),
+                                            "",
+                                            1);
+
+                                    context.output(METRIC_DATA_OUTPUT_TAG, metricData);
+                                }
+
                                 collector.collect(recordTransformer.toSerializedRecord(sinkRecord));
                             }));
                 }
             } else {
-                // TODO, support metrics and more data types
+                // TODO, support more data types
+                if (enableOutputMetrics
+                        && !config.getString(Constants.SOURCE_TYPE).equals(Constants.SOURCE_TYPE_TUBE)) {
+                    MetricData metricData = new MetricData(
+                            MetricSource.DESERIALIZATION,
+                            MetricType.ABANDONED,
+                            serializedRecord.getTimestampMillis(),
+                            serializedRecord.getDataFlowId(),
+                            "Unsupported schema",
+                            1);
+                    context.output(METRIC_DATA_OUTPUT_TAG, metricData);
+                }
+
                 LOG.warn("Abandon data due to unsupported record {}", serializedRecord);
             }
         } catch (Exception e) {
-            // TODO, support metrics
+            if (enableOutputMetrics
+                    && !config.getString(Constants.SOURCE_TYPE).equals(Constants.SOURCE_TYPE_TUBE)) {
+                MetricData metricData = new MetricData(
+                        MetricSource.DESERIALIZATION,
+                        MetricType.ABANDONED,
+                        serializedRecord.getTimestampMillis(),
+                        serializedRecord.getDataFlowId(),
+                        (e.getMessage() == null || e.getMessage().isEmpty()) ? "Exception caught" : e.getMessage(),
+                        1);
+                context.output(METRIC_DATA_OUTPUT_TAG, metricData);
+            }
+
             LOG.warn("Abandon data", e);
         }
     }
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializer.java
index 62cf707..aff45ce 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializer.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializer.java
@@ -90,7 +90,7 @@ public class TDMsgMixedDeserializer implements Deserializer<TDMsgSerializedRecor
             deserializer.flatMap(mixedRow, new CallbackCollector<>((row -> {
                 // each tid might be associated with multiple data flows
                 for (long dataFlowId : dataFlowIds) {
-                    collector.collect(new Record(dataFlowId, row));
+                    collector.collect(new Record(dataFlowId, System.currentTimeMillis(), row));
                 }
             })));
         }));
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/metrics/MetricData.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/metrics/MetricData.java
new file mode 100644
index 0000000..0099c1b
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/metrics/MetricData.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.metrics;
+
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+
+public class MetricData implements Serializable {
+
+    private static final long serialVersionUID = -3294877245954460452L;
+
+    private MetricSource metricSource;
+
+    private MetricType metricType;
+
+    /**
+     * If the instance is an raw MetricsData, this field represents the event time in millis of this metric.
+     *
+     * MetricData will be aggregated into different time windows,
+     * if the instance is an aggregated MetricData, this field represents the start timestamp
+     * of the aggregated window in millis.
+     */
+    private long timestampMillis;
+
+    private long dataFlowId;
+
+    private String attachment;
+
+    // User defined partition, support multi-level partitions
+    private String partitions;
+
+    private long count;
+
+    public MetricData() {
+    }
+
+    public MetricData(
+            MetricSource metricSource,
+            MetricType metricType,
+            long timestampMillis,
+            long dataFlowId,
+            String attachment,
+            String partitions,
+            long count) {
+        this.metricSource = Preconditions.checkNotNull(metricSource);
+        this.metricType = Preconditions.checkNotNull(metricType);
+        this.timestampMillis = timestampMillis;
+        this.dataFlowId = dataFlowId;
+        this.attachment = Preconditions.checkNotNull(attachment);
+        this.partitions = Preconditions.checkNotNull(partitions);
+        this.count = count;
+    }
+
+    public MetricData(
+        MetricSource metricSource,
+        MetricType metricType,
+        long timestampMillis,
+        long dataFlowId,
+        String attachment,
+        long count) {
+        this(metricSource,
+                metricType,
+                timestampMillis,
+                dataFlowId,
+                attachment,
+                "",
+                count);
+    }
+
+    public enum MetricType {
+        SUCCESSFUL,
+        ABANDONED
+    }
+
+    public enum MetricSource {
+        SOURCE,
+        DESERIALIZATION,
+        TRANSFORMER,
+        SINK,
+        COMMITTER
+    }
+
+    @Override
+    public String toString() {
+        return "metricSource : " + metricSource
+                       + ", metricType : " + metricType
+                       + ", timestampMillis : " + timestampMillis
+                       + ", dataFlowId : " + dataFlowId
+                       + ", attachment : " + attachment
+                       + ", partitions : " + partitions
+                       + ", count : " + count;
+    }
+
+    public String getKey() {
+        return metricSource.name()
+                + "|" + metricType.name()
+                + "|" + dataFlowId
+                + "|" + attachment
+                + "|" + partitions;
+    }
+
+    public MetricSource getMetricSource() {
+        return metricSource;
+    }
+
+    public void setMetricSource(MetricSource metricSource) {
+        this.metricSource = metricSource;
+    }
+
+    public MetricType getMetricType() {
+        return metricType;
+    }
+
+    public void setMetricType(MetricType metricType) {
+        this.metricType = metricType;
+    }
+
+    public long getTimestampMillis() {
+        return timestampMillis;
+    }
+
+    public void setTimestampMillis(long timestampMillis) {
+        this.timestampMillis = timestampMillis;
+    }
+
+    public long getDataFlowId() {
+        return dataFlowId;
+    }
+
+    public void setDataFlowId(long dataFlowId) {
+        this.dataFlowId = dataFlowId;
+    }
+
+    public String getAttachment() {
+        return attachment;
+    }
+
+    public void setAttachment(String attachment) {
+        this.attachment = attachment;
+    }
+
+    public String getPartitions() {
+        return partitions;
+    }
+
+    public void setPartitions(String partitions) {
+        this.partitions = partitions;
+    }
+
+    public long getCount() {
+        return count;
+    }
+
+    public void setCount(long count) {
+        this.count = count;
+    }
+
+    public void increase(long count) {
+        this.count += count;
+    }
+}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/metrics/MetricsAggregator.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/metrics/MetricsAggregator.java
new file mode 100644
index 0000000..645c64d
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/metrics/MetricsAggregator.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.metrics;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+public class MetricsAggregator {
+
+    /**
+     * Aggregate Metrics.
+     */
+    public static class MetricsAggregateFunction implements
+            AggregateFunction<MetricData, MetricAccumulator, MetricData> {
+
+        private static final long serialVersionUID = -4980743351369740395L;
+
+        @Override
+        public MetricAccumulator createAccumulator() {
+            return new MetricAccumulator();
+        }
+
+        @Override
+        public MetricAccumulator add(MetricData value, MetricAccumulator accumulator) {
+            if (accumulator.getMetricData() == null) {
+                MetricData metricData = new MetricData(
+                        value.getMetricSource(),
+                        value.getMetricType(),
+                        value.getTimestampMillis(),
+                        value.getDataFlowId(),
+                        value.getAttachment(),
+                        value.getPartitions(),
+                        value.getCount());
+                accumulator.setMetricData(metricData);
+            } else {
+                accumulator.getMetricData().increase(value.getCount());
+            }
+
+            return accumulator;
+        }
+
+        @Override
+        public MetricData getResult(MetricAccumulator accumulator) {
+            return accumulator.getMetricData();
+        }
+
+        @Override
+        public MetricAccumulator merge(MetricAccumulator a, MetricAccumulator b) {
+            a.getMetricData().increase(b.getMetricData().getCount());
+            return a;
+        }
+    }
+
+    /**
+     * Attach partition (window start) to the aggregated MetricData.
+     */
+    public static class MetricsProcessWindowFunction extends
+            ProcessWindowFunction<MetricData, MetricData, String, TimeWindow> {
+
+        private static final long serialVersionUID = 914156043444186657L;
+
+        @Override
+        public void process(
+                String key,
+                Context context,
+                Iterable<MetricData> elements,
+                Collector<MetricData> collector) {
+            Iterator<MetricData> it = elements.iterator();
+
+            if (!it.hasNext()) {
+                return;
+            }
+
+            MetricData result = it.next();
+            while (it.hasNext()) {
+                result.increase(it.next().getCount());
+            }
+            result.setTimestampMillis(context.window().getStart());
+            collector.collect(result);
+        }
+    }
+
+    private static class MetricAccumulator implements Serializable {
+
+        private static final long serialVersionUID = -7368055708339786552L;
+
+        private MetricData metricData;
+
+        public void setMetricData(MetricData metricData) {
+            this.metricData = checkNotNull(metricData);
+        }
+
+        public MetricData getMetricData() {
+            return metricData;
+        }
+    }
+}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgSerializedRecord.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/metrics/MetricsAssignerWithPeriodicWatermarks.java
similarity index 52%
copy from inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgSerializedRecord.java
copy to inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/metrics/MetricsAssignerWithPeriodicWatermarks.java
index 3928d76..4fa4757 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgSerializedRecord.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/metrics/MetricsAssignerWithPeriodicWatermarks.java
@@ -15,36 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.flink;
+package org.apache.inlong.sort.flink.metrics;
 
-import static org.apache.inlong.sort.configuration.Constants.UNKNOWN_DATAFLOW_ID;
+import javax.annotation.Nullable;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
 
-/**
- * Data flow id might not been got from mixed TDMsg data stream.
- */
-public class TDMsgSerializedRecord extends SerializedRecord {
-
-    private static final long serialVersionUID = 4075321919886376829L;
-
-    private String topic;
-
-    /**
-     * Just satisfy requirement of Flink Pojo definition.
-     */
-    public TDMsgSerializedRecord() {
-        super();
-    }
-
-    public TDMsgSerializedRecord(String topic, byte[] data) {
-        super(UNKNOWN_DATAFLOW_ID, data);
-        this.topic = topic;
-    }
+public class MetricsAssignerWithPeriodicWatermarks implements AssignerWithPeriodicWatermarks<MetricData> {
+    private long currentMaxTimestamp;
 
-    public String getTopic() {
-        return topic;
+    @Nullable
+    @Override
+    public Watermark getCurrentWatermark() {
+        return new Watermark(currentMaxTimestamp);
     }
 
-    public void setTopic(String topic) {
-        this.topic = topic;
+    @Override
+    public long extractTimestamp(MetricData metricData, long previousElementTimestamp) {
+        long timestamp = metricData.getTimestampMillis();
+        currentMaxTimestamp = Math.max(timestamp, previousElementTimestamp);
+        return timestamp;
     }
 }
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/metrics/MetricsLogSink.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/metrics/MetricsLogSink.java
new file mode 100644
index 0000000..5b4b5f0
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/metrics/MetricsLogSink.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.metrics;
+
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetricsLogSink extends RichSinkFunction<MetricData> implements CheckpointedFunction {
+
+    private static final long serialVersionUID = -2741257263371632560L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(MetricsLogSink.class);
+
+    private transient SimpleDateFormat partitionSecondDateFormat;
+
+    private transient SimpleDateFormat partitionDayDateFormat;
+
+    public MetricsLogSink() {
+    }   
+
+    @Override
+    public void open(Configuration configuration) throws Exception {
+        partitionSecondDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        partitionDayDateFormat = new SimpleDateFormat("yyyyMMdd");
+    }
+
+    @Override
+    public void invoke(MetricData metricData, Context context) throws Exception {
+        Timestamp timestamp = new Timestamp(metricData.getTimestampMillis());
+        LOG.info("Record metric, partition_day = '{}', partition_second = '{}', metric_source = '{}',"
+                + " metric_type = '{}', attachment = '{}', dataflow_id = '{}', user_partition = '{}', count = '{}'",
+                Long.valueOf(partitionDayDateFormat.format(timestamp)),
+                partitionSecondDateFormat.format(timestamp),
+                metricData.getMetricSource(),
+                metricData.getMetricType(),
+                metricData.getAttachment(),
+                metricData.getDataFlowId(),
+                metricData.getPartitions(),
+                metricData.getCount());
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+    }
+}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/transformation/FieldMappingTransformer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/transformation/FieldMappingTransformer.java
index e4485c2..c4414ae 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/transformation/FieldMappingTransformer.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/transformation/FieldMappingTransformer.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sort.flink.transformation;
 
+import java.sql.Timestamp;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.flink.types.Row;
@@ -26,6 +27,8 @@ import org.apache.inlong.sort.protocol.DataFlowInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.sink.SinkInfo;
 import org.apache.inlong.sort.protocol.source.SourceInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.inlong.sort.configuration.Constants.DATA_TIME_FIELD;
 
@@ -34,6 +37,8 @@ import static org.apache.inlong.sort.configuration.Constants.DATA_TIME_FIELD;
  */
 public class FieldMappingTransformer implements DataFlowInfoListener {
 
+    private static final Logger LOG = LoggerFactory.getLogger(FieldMappingTransformer.class);
+
     /**
      * Skips time and attribute fields of source record.
      *
@@ -71,7 +76,15 @@ public class FieldMappingTransformer implements DataFlowInfoListener {
                 sinkRow.setField(i, sourceRow.getField(fieldIndex));
             }
         }
-        return new Record(sourceRecord.getDataflowId(), sinkRow);
+
+        long recordTimestampMillis;
+        try {
+            recordTimestampMillis = ((Timestamp) sourceRow.getField(0)).getTime();
+        } catch (Exception e) {
+            recordTimestampMillis = System.currentTimeMillis();
+            LOG.warn("Failed to extract timestamp from data.", e);
+        }
+        return new Record(sourceRecord.getDataflowId(), recordTimestampMillis, sinkRow);
     }
 
     /**
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/transformation/RecordTransformer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/transformation/RecordTransformer.java
index ba0b36e..6facf74 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/transformation/RecordTransformer.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/transformation/RecordTransformer.java
@@ -78,6 +78,7 @@ public class RecordTransformer implements DataFlowInfoListener {
         try {
             rowSerializer.serialize(newRecord.getRow(), dataOutputSerializer);
             serializedRecord = new SerializedRecord(dataFlowId,
+                    record.getTimestampMillis(),
                     dataOutputSerializer.getCopyOfBuffer());
             dataOutputSerializer.pruneBuffer();
         } catch (Exception e) {
@@ -101,7 +102,7 @@ public class RecordTransformer implements DataFlowInfoListener {
         } finally {
             dataInputDeserializer.releaseArrays();
         }
-        return new Record(dataFlowId, row);
+        return new Record(dataFlowId, serializedRecord.getTimestampMillis(), row);
     }
 
     private RowSerializer getRowSerializer(long dataFlowId) throws Exception {
@@ -127,7 +128,7 @@ public class RecordTransformer implements DataFlowInfoListener {
                 newRow.setField(i, null);
             }
         }
-        return new Record(oldRecord.getDataflowId(), newRow);
+        return new Record(oldRecord.getDataflowId(), oldRecord.getTimestampMillis(), newRow);
     }
 
     @VisibleForTesting
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java
index 09b13c0..028a89a 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java
@@ -396,7 +396,8 @@ public class MultiTenancyTubeConsumer {
                 synchronized (context.getCheckpointLock()) {
                     for (Message message : consumeResult.getMessageList()) {
                         // TODO, optimize for single tid or no tid topic
-                        context.collect(new TDMsgSerializedRecord(topic, message.getData()));
+                        context.collect(new TDMsgSerializedRecord(
+                                topic, System.currentTimeMillis(), message.getData()));
                     }
                     final String partitionKey = consumeResult.getPartitionKey();
                     final long offset = consumeResult.getCurrOffset();
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java
index 99a0d89..397544e 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java
@@ -84,7 +84,7 @@ public class MultiTenancyTDMsgMixedDeserializerTest extends TestLogger {
         tdMsg1.addMsg(attrs, body1.getBytes());
 
         final TestingCollector<Record> collector = new TestingCollector<>();
-        deserializer.deserialize(new TDMsgSerializedRecord("topic", tdMsg1.buildArray()), collector);
+        deserializer.deserialize(new TDMsgSerializedRecord("topic", 0, tdMsg1.buildArray()), collector);
 
         assertEquals(1, collector.results.size());
         assertEquals(1L, collector.results.get(0).getDataflowId());
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/hive/HiveSinkITCase.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/hive/HiveSinkITCase.java
index dd6f7dd..1d60913 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/hive/HiveSinkITCase.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/hive/HiveSinkITCase.java
@@ -380,7 +380,8 @@ public class HiveSinkITCase extends TestLogger {
                 row.setField(1, fieldValue2);
                 row.setField(2, line[0]);
                 row.setField(3, line[1]);
-                sourceContext.collect(transformer.toSerializedRecord(new Record(dataFlowInfo.getId(), row)));
+                sourceContext.collect(transformer.toSerializedRecord(
+                        new Record(dataFlowInfo.getId(), System.currentTimeMillis(), row)));
             }
             verificationFinishedLatch.await();
         }
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgSerializedRecord.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/metrics/MetricDataTest.java
similarity index 51%
copy from inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgSerializedRecord.java
copy to inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/metrics/MetricDataTest.java
index 3928d76..0744559 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgSerializedRecord.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/metrics/MetricDataTest.java
@@ -15,36 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.flink;
-
-import static org.apache.inlong.sort.configuration.Constants.UNKNOWN_DATAFLOW_ID;
-
-/**
- * Data flow id might not been got from mixed TDMsg data stream.
- */
-public class TDMsgSerializedRecord extends SerializedRecord {
-
-    private static final long serialVersionUID = 4075321919886376829L;
-
-    private String topic;
-
-    /**
-     * Just satisfy requirement of Flink Pojo definition.
-     */
-    public TDMsgSerializedRecord() {
-        super();
-    }
-
-    public TDMsgSerializedRecord(String topic, byte[] data) {
-        super(UNKNOWN_DATAFLOW_ID, data);
-        this.topic = topic;
-    }
-
-    public String getTopic() {
-        return topic;
-    }
-
-    public void setTopic(String topic) {
-        this.topic = topic;
+package org.apache.inlong.sort.flink.metrics;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MetricDataTest {
+    @Test
+    public void testPOJO() {
+        Assert.assertEquals(
+                "org.apache.flink.api.java.typeutils.runtime.PojoSerializer",
+                TypeInformation.of(MetricData.class).createSerializer(new ExecutionConfig()).getClass().getName());
     }
 }
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/metrics/MetricsLogSinkTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/metrics/MetricsLogSinkTest.java
new file mode 100644
index 0000000..827c5eb
--- /dev/null
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/metrics/MetricsLogSinkTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.metrics;
+
+import org.apache.flink.streaming.api.functions.sink.SinkFunction.Context;
+import org.apache.inlong.sort.flink.metrics.MetricData.MetricSource;
+import org.apache.inlong.sort.flink.metrics.MetricData.MetricType;
+import org.junit.Test;
+
+public class MetricsLogSinkTest {
+    @Test
+    public void test() throws Exception {
+        final org.apache.flink.configuration.Configuration flinkConfig
+                = new org.apache.flink.configuration.Configuration();
+        MetricsLogSink metricsLogSink = new MetricsLogSink();
+        metricsLogSink.open(flinkConfig);
+
+        MetricData metricData = new MetricData(
+                MetricSource.SOURCE,
+                MetricType.SUCCESSFUL,
+                0,
+                1,
+                "attach",
+                1
+        );
+        metricsLogSink.invoke(metricData, new Context() {
+            @Override
+            public long currentProcessingTime() {
+                return 0;
+            }
+
+            @Override
+            public long currentWatermark() {
+                return 0;
+            }
+
+            @Override
+            public Long timestamp() {
+                return null;
+            }
+        });
+    }
+}
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/transformation/FieldMappingTransformerTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/transformation/FieldMappingTransformerTest.java
index 43aab2d..6e37aae 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/transformation/FieldMappingTransformerTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/transformation/FieldMappingTransformerTest.java
@@ -56,7 +56,7 @@ public class FieldMappingTransformerTest extends TestLogger {
         sourceRow.setField(1, "attr");
         sourceRow.setField(2, "not important");
         sourceRow.setField(3, 9527L);
-        final Record sourceRecord = new Record(dataFlowId, sourceRow);
+        final Record sourceRecord = new Record(dataFlowId, System.currentTimeMillis(), sourceRow);
         final Record sinkRecord = transformer.transform(sourceRecord);
         assertEquals(dataFlowId, sinkRecord.getDataflowId());
         assertEquals(1, sinkRecord.getRow().getArity());
@@ -81,7 +81,7 @@ public class FieldMappingTransformerTest extends TestLogger {
         sourceRow.setField(0, dt);
         sourceRow.setField(1, "attr");
         sourceRow.setField(2, 9527L);
-        final Record sourceRecord = new Record(dataFlowId, sourceRow);
+        final Record sourceRecord = new Record(dataFlowId, System.currentTimeMillis(), sourceRow);
         final Record sinkRecord = transformer.transform(sourceRecord);
         assertEquals(dataFlowId, sinkRecord.getDataflowId());
         assertEquals(2, sinkRecord.getRow().getArity());
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/transformation/RecordTransformerTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/transformation/RecordTransformerTest.java
index f884e23..f1f9111 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/transformation/RecordTransformerTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/transformation/RecordTransformerTest.java
@@ -53,7 +53,7 @@ public class RecordTransformerTest extends TestLogger {
         final Row row = new Row(2);
         row.setField(0, 1024L);
         row.setField(1, "9527");
-        final Record record = new Record(1L, row);
+        final Record record = new Record(1L, System.currentTimeMillis(), row);
         final Record transformed = transformer.toRecord(transformer.toSerializedRecord(record));
         assertEquals(record, transformed);
         // check the buffers
@@ -81,7 +81,7 @@ public class RecordTransformerTest extends TestLogger {
         final Row row = new Row(2);
         row.setField(0, 1024L);
         row.setField(1, "9527");
-        final Record record = new Record(1L, row);
+        final Record record = new Record(1L, System.currentTimeMillis(), row);
 
         assertSame(record, transformer.matchRecordAndSerializerField(record, rowSerializers.get(1L)));
     }
@@ -101,7 +101,7 @@ public class RecordTransformerTest extends TestLogger {
 
         final Row oneFieldRow = new Row(1);
         oneFieldRow.setField(0, 1024L);
-        final Record oneFieldRecord = new Record(1L, oneFieldRow);
+        final Record oneFieldRecord = new Record(1L, System.currentTimeMillis(), oneFieldRow);
 
         assertEquals(2,
                 transformer.matchRecordAndSerializerField(oneFieldRecord, rowSerializers.get(1L)).getRow().getArity());
@@ -110,7 +110,7 @@ public class RecordTransformerTest extends TestLogger {
         threeFieldRow.setField(0, 1024L);
         threeFieldRow.setField(1, "9527");
         threeFieldRow.setField(2, 2048);
-        final Record threeFieldRecord = new Record(1L, threeFieldRow);
+        final Record threeFieldRecord = new Record(1L, System.currentTimeMillis(), threeFieldRow);
 
         assertEquals(2,
                 transformer.matchRecordAndSerializerField(threeFieldRecord, rowSerializers.get(1L)).getRow()
@@ -123,7 +123,7 @@ public class RecordTransformerTest extends TestLogger {
         final Row row = new Row(2);
         row.setField(0, 1024L);
         row.setField(1, "9527");
-        final Record record = new Record(1L, row);
+        final Record record = new Record(1L, System.currentTimeMillis(), row);
 
         final int bufferSize = 1024;
         final RecordTransformer transformer = new RecordTransformer(bufferSize);
@@ -165,7 +165,7 @@ public class RecordTransformerTest extends TestLogger {
         final Row row = new Row(2);
         row.setField(0, 1024L);
         row.setField(1, 2048);
-        final Record record = new Record(1L, row);
+        final Record record = new Record(1L, System.currentTimeMillis(), row);
 
         try {
             transformer.toSerializedRecord(record);