You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/08/23 09:12:05 UTC
[incubator-inlong] branch master updated: [INLONG-746] Support
metric system in inlong-sort (#1335) (#1461)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a3c3836 [INLONG-746] Support metric system in inlong-sort (#1335) (#1461)
a3c3836 is described below
commit a3c38365fb29bcff27dc328a9701a3585a57f10a
Author: chantccc <52...@users.noreply.github.com>
AuthorDate: Mon Aug 23 17:11:59 2021 +0800
[INLONG-746] Support metric system in inlong-sort (#1335) (#1461)
Co-authored-by: tianqiwan <ti...@tencent.com>
---
.../inlong/sort/configuration/Constants.java | 34 ++++
.../org/apache/inlong/sort/flink/Entrance.java | 43 +++++
.../java/org/apache/inlong/sort/flink/Record.java | 18 ++-
.../apache/inlong/sort/flink/SerializedRecord.java | 14 +-
.../inlong/sort/flink/TDMsgSerializedRecord.java | 4 +-
.../deserialization/DeserializationSchema.java | 67 +++++++-
.../deserialization/TDMsgMixedDeserializer.java | 2 +-
.../inlong/sort/flink/metrics/MetricData.java | 176 +++++++++++++++++++++
.../sort/flink/metrics/MetricsAggregator.java | 118 ++++++++++++++
.../MetricsAssignerWithPeriodicWatermarks.java} | 41 ++---
.../inlong/sort/flink/metrics/MetricsLogSink.java | 71 +++++++++
.../transformation/FieldMappingTransformer.java | 15 +-
.../flink/transformation/RecordTransformer.java | 5 +-
.../flink/tubemq/MultiTenancyTubeConsumer.java | 3 +-
.../MultiTenancyTDMsgMixedDeserializerTest.java | 2 +-
.../inlong/sort/flink/hive/HiveSinkITCase.java | 3 +-
.../inlong/sort/flink/metrics/MetricDataTest.java} | 44 ++----
.../sort/flink/metrics/MetricsLogSinkTest.java | 58 +++++++
.../FieldMappingTransformerTest.java | 4 +-
.../transformation/RecordTransformerTest.java | 12 +-
20 files changed, 655 insertions(+), 79 deletions(-)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
index 7ecbae2..aa9391e 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
@@ -32,6 +32,8 @@ public class Constants {
public static final String SINK_TYPE_HIVE = "hive";
+ public static final String METRIC_DATA_OUTPUT_TAG_ID = "metric_data_side_output";
+
// ------------------------------------------------------------------------
// Operator uid
// ------------------------------------------------------------------------
@@ -241,4 +243,36 @@ public class Constants {
public static final ConfigOption<Integer> CHECKPOINT_TIMEOUT_MS =
key("checkpoint.timeout.ms")
.defaultValue(600000);
+
+ // ------------------------------------------------------------------------
+ // Metrics related
+ // ------------------------------------------------------------------------
+ public static final ConfigOption<Boolean> METRICS_ENABLE_OUTPUT =
+ key("metrics.enable.output")
+ .defaultValue(true);
+
+ public static final ConfigOption<Integer> METRICS_TIMESTAMP_WATERMARK_ASSIGNER_PARALLELISM =
+ key("metrics.timestamp.watermark.assigner.parallelism")
+ .defaultValue(1);
+
+ public static final ConfigOption<Integer> METRICS_AGGREGATOR_PARALLELISM =
+ key("metrics.aggregator.parallelism")
+ .defaultValue(1);
+
+ public static final ConfigOption<Integer> METRICS_SINK_PARALLELISM =
+ key("metrics.sink.parallelism")
+ .defaultValue(1)
+ .withDeprecatedKeys("metrics.mysql.sink.parallelism");
+
+ public static final String METRICS_TIMESTAMP_AND_WATERMARK_ASSIGNER_UID
+ = "metrics_timestamp_and_watermark_assigner_uid";
+
+ public static final String METRICS_AGGREGATOR_UID = "metrics_aggregator_uid";
+
+ public static final String METRICS_SINK_UID = "metrics_sink_uid";
+
+ public static final ConfigOption<Integer> METRICS_AGGREGATOR_WINDOW_SIZE =
+ key("metrics.aggregator.window.size")
+ .defaultValue(5)
+ .withDescription("minutes");
}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/Entrance.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/Entrance.java
index e7ee47b..2d5c39b 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/Entrance.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/Entrance.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.flink;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.inlong.sort.configuration.Constants.SINK_TYPE_HIVE;
+import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -30,8 +31,16 @@ import org.apache.inlong.sort.flink.clickhouse.ClickHouseMultiSinkFunction;
import org.apache.inlong.sort.flink.deserialization.DeserializationSchema;
import org.apache.inlong.sort.flink.hive.HiveMultiTenantCommitter;
import org.apache.inlong.sort.flink.hive.HiveMultiTenantWriter;
+import org.apache.inlong.sort.flink.metrics.MetricData;
+import org.apache.inlong.sort.flink.metrics.MetricsAggregator.MetricsAggregateFunction;
+import org.apache.inlong.sort.flink.metrics.MetricsAggregator.MetricsProcessWindowFunction;
+import org.apache.inlong.sort.flink.metrics.MetricsAssignerWithPeriodicWatermarks;
+import org.apache.inlong.sort.flink.metrics.MetricsLogSink;
import org.apache.inlong.sort.flink.tubemq.MultiTopicTubeSourceFunction;
import org.apache.inlong.sort.util.ParameterTool;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.OutputTag;
public class Entrance {
@@ -58,6 +67,7 @@ public class Entrance {
env.getCheckpointConfig().setCheckpointTimeout(config.getInteger(Constants.CHECKPOINT_TIMEOUT_MS));
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+ // Data stream
DataStream<SerializedRecord> sourceStream;
if (sourceType.equals(Constants.SOURCE_TYPE_TUBE)) {
sourceStream = env
@@ -95,6 +105,39 @@ public class Entrance {
throw new IllegalArgumentException("Unsupported sink type " + sinkType);
}
+ // Metric stream
+ final boolean enableOutputMetrics = config.getBoolean(Constants.METRICS_ENABLE_OUTPUT);
+ if (enableOutputMetrics) {
+ final int metricsAggregatorParallelism = config.getInteger(Constants.METRICS_AGGREGATOR_PARALLELISM);
+ final int metricsTimestampWatermarkAssignerParallelism = config
+ .getInteger(Constants.METRICS_TIMESTAMP_WATERMARK_ASSIGNER_PARALLELISM);
+ final int metricsMySQLSinkParallelism = config.getInteger(Constants.METRICS_SINK_PARALLELISM);
+ final OutputTag<MetricData> outputTag = new OutputTag<MetricData>(Constants.METRIC_DATA_OUTPUT_TAG_ID) {};
+
+ final DataStream<MetricData> metricsDataStream = deserializationStream
+ .getSideOutput(outputTag)
+ .assignTimestampsAndWatermarks(new MetricsAssignerWithPeriodicWatermarks())
+ .setParallelism(metricsTimestampWatermarkAssignerParallelism)
+ .uid(Constants.METRICS_TIMESTAMP_AND_WATERMARK_ASSIGNER_UID)
+ .name("Metrics timestamp/watermark assigner");
+
+ final DataStream<MetricData> metricsAggregatorStream = metricsDataStream
+ .keyBy((KeySelector<MetricData, String>) MetricData::getKey)
+ .window(TumblingEventTimeWindows.of(
+ Time.minutes(config.getInteger(Constants.METRICS_AGGREGATOR_WINDOW_SIZE))))
+ .allowedLateness(Time.milliseconds(Long.MAX_VALUE))
+ .aggregate(new MetricsAggregateFunction(), new MetricsProcessWindowFunction())
+ .setParallelism(metricsAggregatorParallelism)
+ .uid(Constants.METRICS_AGGREGATOR_UID)
+ .name("Metrics aggregator");
+
+ metricsAggregatorStream
+ .addSink(new MetricsLogSink())
+ .setParallelism(metricsMySQLSinkParallelism)
+ .uid(Constants.METRICS_SINK_UID)
+ .name("Metrics sink");
+ }
+
env.execute(clusterId);
}
}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/Record.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/Record.java
index 3b8707c..e5e16ba 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/Record.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/Record.java
@@ -32,6 +32,9 @@ public class Record implements Serializable {
private long dataflowId;
+ // Event time
+ private long timestampMillis;
+
private Row row;
/**
@@ -41,8 +44,9 @@ public class Record implements Serializable {
}
- public Record(long dataflowId, Row row) {
+ public Record(long dataflowId, long timestampMillis, Row row) {
this.dataflowId = dataflowId;
+ this.timestampMillis = timestampMillis;
this.row = checkNotNull(row);
}
@@ -54,6 +58,14 @@ public class Record implements Serializable {
this.dataflowId = dataflowId;
}
+ public long getTimestampMillis() {
+ return timestampMillis;
+ }
+
+ public void setTimestampMillis(long timestampMillis) {
+ this.timestampMillis = timestampMillis;
+ }
+
public Row getRow() {
return row;
}
@@ -73,6 +85,8 @@ public class Record implements Serializable {
}
Record that = (Record) o;
- return dataflowId == ((Record) o).dataflowId && Objects.equals(row, that.row);
+ return dataflowId == that.dataflowId
+ && Objects.equals(row, that.row)
+ && timestampMillis == that.timestampMillis;
}
}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/SerializedRecord.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/SerializedRecord.java
index 1e8ad87..bb471f4 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/SerializedRecord.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/SerializedRecord.java
@@ -25,6 +25,9 @@ public class SerializedRecord implements Serializable {
private long dataFlowId;
+ // Event time
+ private long timestampMillis;
+
private byte[] data;
/**
@@ -34,8 +37,9 @@ public class SerializedRecord implements Serializable {
}
- public SerializedRecord(long dataFlowId, byte[] data) {
+ public SerializedRecord(long dataFlowId, long timestampMillis, byte[] data) {
this.dataFlowId = dataFlowId;
+ this.timestampMillis = timestampMillis;
this.data = data;
}
@@ -43,6 +47,10 @@ public class SerializedRecord implements Serializable {
this.dataFlowId = dataFlowId;
}
+ public void setTimestampMillis(long timestampMillis) {
+ this.timestampMillis = timestampMillis;
+ }
+
public void setData(byte[] data) {
this.data = data;
}
@@ -51,6 +59,10 @@ public class SerializedRecord implements Serializable {
return dataFlowId;
}
+ public long getTimestampMillis() {
+ return timestampMillis;
+ }
+
public byte[] getData() {
return data;
}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgSerializedRecord.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgSerializedRecord.java
index 3928d76..37feba0 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgSerializedRecord.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgSerializedRecord.java
@@ -35,8 +35,8 @@ public class TDMsgSerializedRecord extends SerializedRecord {
super();
}
- public TDMsgSerializedRecord(String topic, byte[] data) {
- super(UNKNOWN_DATAFLOW_ID, data);
+ public TDMsgSerializedRecord(String topic, long timestampMillis, byte[] data) {
+ super(UNKNOWN_DATAFLOW_ID, timestampMillis, data);
this.topic = topic;
}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/DeserializationSchema.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/DeserializationSchema.java
index eb8e0e4..ae194bc 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/DeserializationSchema.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/DeserializationSchema.java
@@ -27,18 +27,27 @@ import org.apache.inlong.sort.configuration.Constants;
import org.apache.inlong.sort.flink.Record;
import org.apache.inlong.sort.flink.SerializedRecord;
import org.apache.inlong.sort.flink.TDMsgSerializedRecord;
+import org.apache.inlong.sort.flink.metrics.MetricData;
+import org.apache.inlong.sort.flink.metrics.MetricData.MetricSource;
+import org.apache.inlong.sort.flink.metrics.MetricData.MetricType;
import org.apache.inlong.sort.flink.transformation.FieldMappingTransformer;
import org.apache.inlong.sort.flink.transformation.RecordTransformer;
import org.apache.inlong.sort.meta.MetaManager;
import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DeserializationSchema extends ProcessFunction<SerializedRecord, SerializedRecord> {
+ private static final long serialVersionUID = 5380421870587560943L;
+
private static final Logger LOG = LoggerFactory.getLogger(DeserializationSchema.class);
+ private static final OutputTag<MetricData> METRIC_DATA_OUTPUT_TAG
+ = new OutputTag<MetricData>(Constants.METRIC_DATA_OUTPUT_TAG_ID) {};
+
private transient RecordTransformer recordTransformer;
private transient FieldMappingTransformer fieldMappingTransformer;
@@ -51,6 +60,8 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
private transient MetaManager metaManager;
+ private transient Boolean enableOutputMetrics;
+
public DeserializationSchema(Configuration config) {
this.config = Preconditions.checkNotNull(config);
}
@@ -63,6 +74,7 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
recordTransformer = new RecordTransformer(config.getInteger(Constants.ETL_RECORD_SERIALIZATION_BUFFER_SIZE));
metaManager = MetaManager.getInstance(config);
metaManager.registerDataFlowInfoListener(new DataFlowInfoListenerImpl());
+ enableOutputMetrics = config.getBoolean(Constants.METRICS_ENABLE_OUTPUT);
}
@Override
@@ -79,6 +91,20 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
Context context,
Collector<SerializedRecord> collector) throws Exception {
try {
+ if (enableOutputMetrics
+ && !config.getString(Constants.SOURCE_TYPE).equals(Constants.SOURCE_TYPE_TUBE)) {
+ // If source is tube, we do not output metrics of package number
+ final MetricData metricData = new MetricData(
+ // since source could not have side-outputs, so it outputs metrics for source here
+ MetricSource.SOURCE,
+ MetricType.SUCCESSFUL,
+ serializedRecord.getTimestampMillis(),
+ serializedRecord.getDataFlowId(),
+ "",
+ 1L);
+ context.output(METRIC_DATA_OUTPUT_TAG, metricData);
+ }
+
if (serializedRecord instanceof TDMsgSerializedRecord
&& serializedRecord.getDataFlowId() == UNKNOWN_DATAFLOW_ID) {
final TDMsgSerializedRecord tdmsgRecord = (TDMsgSerializedRecord) serializedRecord;
@@ -87,15 +113,52 @@ public class DeserializationSchema extends ProcessFunction<SerializedRecord, Ser
tdmsgRecord,
new CallbackCollector<>(sourceRecord -> {
final Record sinkRecord = fieldMappingTransformer.transform(sourceRecord);
+
+ if (enableOutputMetrics) {
+ MetricData metricData = new MetricData(
+ // TODO, outputs this metric in Sink Function
+ MetricSource.SINK,
+ MetricType.SUCCESSFUL,
+ sinkRecord.getTimestampMillis(),
+ sinkRecord.getDataflowId(),
+ "",
+ 1);
+
+ context.output(METRIC_DATA_OUTPUT_TAG, metricData);
+ }
+
collector.collect(recordTransformer.toSerializedRecord(sinkRecord));
}));
}
} else {
- // TODO, support metrics and more data types
+ // TODO, support more data types
+ if (enableOutputMetrics
+ && !config.getString(Constants.SOURCE_TYPE).equals(Constants.SOURCE_TYPE_TUBE)) {
+ MetricData metricData = new MetricData(
+ MetricSource.DESERIALIZATION,
+ MetricType.ABANDONED,
+ serializedRecord.getTimestampMillis(),
+ serializedRecord.getDataFlowId(),
+ "Unsupported schema",
+ 1);
+ context.output(METRIC_DATA_OUTPUT_TAG, metricData);
+ }
+
LOG.warn("Abandon data due to unsupported record {}", serializedRecord);
}
} catch (Exception e) {
- // TODO, support metrics
+ if (enableOutputMetrics
+ && !config.getString(Constants.SOURCE_TYPE).equals(Constants.SOURCE_TYPE_TUBE)) {
+ MetricData metricData = new MetricData(
+ MetricSource.DESERIALIZATION,
+ MetricType.ABANDONED,
+ serializedRecord.getTimestampMillis(),
+ serializedRecord.getDataFlowId(),
+ (e.getMessage() == null || e.getMessage().isEmpty()) ? "Exception caught" : e.getMessage(),
+ 1);
+ context.output(METRIC_DATA_OUTPUT_TAG, metricData);
+ }
+
LOG.warn("Abandon data", e);
}
}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializer.java
index 62cf707..aff45ce 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializer.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/deserialization/TDMsgMixedDeserializer.java
@@ -90,7 +90,7 @@ public class TDMsgMixedDeserializer implements Deserializer<TDMsgSerializedRecor
deserializer.flatMap(mixedRow, new CallbackCollector<>((row -> {
// each tid might be associated with multiple data flows
for (long dataFlowId : dataFlowIds) {
- collector.collect(new Record(dataFlowId, row));
+ collector.collect(new Record(dataFlowId, System.currentTimeMillis(), row));
}
})));
}));
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/metrics/MetricData.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/metrics/MetricData.java
new file mode 100644
index 0000000..0099c1b
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/metrics/MetricData.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.metrics;
+
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+
+public class MetricData implements Serializable {
+
+ private static final long serialVersionUID = -3294877245954460452L;
+
+ private MetricSource metricSource;
+
+ private MetricType metricType;
+
+ /**
+ * If the instance is an raw MetricsData, this field represents the event time in millis of this metric.
+ *
+ * MetricData will be aggregated into different time windows,
+ * if the instance is an aggregated MetricData, this field represents the start timestamp
+ * of the aggregated window in millis.
+ */
+ private long timestampMillis;
+
+ private long dataFlowId;
+
+ private String attachment;
+
+ // User defined partition, support multi-level partitions
+ private String partitions;
+
+ private long count;
+
+ public MetricData() {
+ }
+
+ public MetricData(
+ MetricSource metricSource,
+ MetricType metricType,
+ long timestampMillis,
+ long dataFlowId,
+ String attachment,
+ String partitions,
+ long count) {
+ this.metricSource = Preconditions.checkNotNull(metricSource);
+ this.metricType = Preconditions.checkNotNull(metricType);
+ this.timestampMillis = timestampMillis;
+ this.dataFlowId = dataFlowId;
+ this.attachment = Preconditions.checkNotNull(attachment);
+ this.partitions = Preconditions.checkNotNull(partitions);
+ this.count = count;
+ }
+
+ public MetricData(
+ MetricSource metricSource,
+ MetricType metricType,
+ long timestampMillis,
+ long dataFlowId,
+ String attachment,
+ long count) {
+ this(metricSource,
+ metricType,
+ timestampMillis,
+ dataFlowId,
+ attachment,
+ "",
+ count);
+ }
+
+ public enum MetricType {
+ SUCCESSFUL,
+ ABANDONED
+ }
+
+ public enum MetricSource {
+ SOURCE,
+ DESERIALIZATION,
+ TRANSFORMER,
+ SINK,
+ COMMITTER
+ }
+
+ @Override
+ public String toString() {
+ return "metricSource : " + metricSource
+ + ", metricType : " + metricType
+ + ", timestampMillis : " + timestampMillis
+ + ", dataFlowId : " + dataFlowId
+ + ", attachment : " + attachment
+ + ", partitions : " + partitions
+ + ", count : " + count;
+ }
+
+ public String getKey() {
+ return metricSource.name()
+ + "|" + metricType.name()
+ + "|" + dataFlowId
+ + "|" + attachment
+ + "|" + partitions;
+ }
+
+ public MetricSource getMetricSource() {
+ return metricSource;
+ }
+
+ public void setMetricSource(MetricSource metricSource) {
+ this.metricSource = metricSource;
+ }
+
+ public MetricType getMetricType() {
+ return metricType;
+ }
+
+ public void setMetricType(MetricType metricType) {
+ this.metricType = metricType;
+ }
+
+ public long getTimestampMillis() {
+ return timestampMillis;
+ }
+
+ public void setTimestampMillis(long timestampMillis) {
+ this.timestampMillis = timestampMillis;
+ }
+
+ public long getDataFlowId() {
+ return dataFlowId;
+ }
+
+ public void setDataFlowId(long dataFlowId) {
+ this.dataFlowId = dataFlowId;
+ }
+
+ public String getAttachment() {
+ return attachment;
+ }
+
+ public void setAttachment(String attachment) {
+ this.attachment = attachment;
+ }
+
+ public String getPartitions() {
+ return partitions;
+ }
+
+ public void setPartitions(String partitions) {
+ this.partitions = partitions;
+ }
+
+ public long getCount() {
+ return count;
+ }
+
+ public void setCount(long count) {
+ this.count = count;
+ }
+
+ public void increase(long count) {
+ this.count += count;
+ }
+}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/metrics/MetricsAggregator.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/metrics/MetricsAggregator.java
new file mode 100644
index 0000000..645c64d
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/metrics/MetricsAggregator.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.metrics;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+public class MetricsAggregator {
+
+ /**
+ * Aggregate Metrics.
+ */
+ public static class MetricsAggregateFunction implements
+ AggregateFunction<MetricData, MetricAccumulator, MetricData> {
+
+ private static final long serialVersionUID = -4980743351369740395L;
+
+ @Override
+ public MetricAccumulator createAccumulator() {
+ return new MetricAccumulator();
+ }
+
+ @Override
+ public MetricAccumulator add(MetricData value, MetricAccumulator accumulator) {
+ if (accumulator.getMetricData() == null) {
+ MetricData metricData = new MetricData(
+ value.getMetricSource(),
+ value.getMetricType(),
+ value.getTimestampMillis(),
+ value.getDataFlowId(),
+ value.getAttachment(),
+ value.getPartitions(),
+ value.getCount());
+ accumulator.setMetricData(metricData);
+ } else {
+ accumulator.getMetricData().increase(value.getCount());
+ }
+
+ return accumulator;
+ }
+
+ @Override
+ public MetricData getResult(MetricAccumulator accumulator) {
+ return accumulator.getMetricData();
+ }
+
+ @Override
+ public MetricAccumulator merge(MetricAccumulator a, MetricAccumulator b) {
+ a.getMetricData().increase(b.getMetricData().getCount());
+ return a;
+ }
+ }
+
+ /**
+ * Attach partition (window start) to the aggregated MetricData.
+ */
+ public static class MetricsProcessWindowFunction extends
+ ProcessWindowFunction<MetricData, MetricData, String, TimeWindow> {
+
+ private static final long serialVersionUID = 914156043444186657L;
+
+ @Override
+ public void process(
+ String key,
+ Context context,
+ Iterable<MetricData> elements,
+ Collector<MetricData> collector) {
+ Iterator<MetricData> it = elements.iterator();
+
+ if (!it.hasNext()) {
+ return;
+ }
+
+ MetricData result = it.next();
+ while (it.hasNext()) {
+ result.increase(it.next().getCount());
+ }
+ result.setTimestampMillis(context.window().getStart());
+ collector.collect(result);
+ }
+ }
+
+ private static class MetricAccumulator implements Serializable {
+
+ private static final long serialVersionUID = -7368055708339786552L;
+
+ private MetricData metricData;
+
+ public void setMetricData(MetricData metricData) {
+ this.metricData = checkNotNull(metricData);
+ }
+
+ public MetricData getMetricData() {
+ return metricData;
+ }
+ }
+}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgSerializedRecord.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/metrics/MetricsAssignerWithPeriodicWatermarks.java
similarity index 52%
copy from inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgSerializedRecord.java
copy to inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/metrics/MetricsAssignerWithPeriodicWatermarks.java
index 3928d76..4fa4757 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgSerializedRecord.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/metrics/MetricsAssignerWithPeriodicWatermarks.java
@@ -15,36 +15,25 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.flink;
+package org.apache.inlong.sort.flink.metrics;
-import static org.apache.inlong.sort.configuration.Constants.UNKNOWN_DATAFLOW_ID;
+import javax.annotation.Nullable;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
-/**
- * Data flow id might not been got from mixed TDMsg data stream.
- */
-public class TDMsgSerializedRecord extends SerializedRecord {
-
- private static final long serialVersionUID = 4075321919886376829L;
-
- private String topic;
-
- /**
- * Just satisfy requirement of Flink Pojo definition.
- */
- public TDMsgSerializedRecord() {
- super();
- }
-
- public TDMsgSerializedRecord(String topic, byte[] data) {
- super(UNKNOWN_DATAFLOW_ID, data);
- this.topic = topic;
- }
+public class MetricsAssignerWithPeriodicWatermarks implements AssignerWithPeriodicWatermarks<MetricData> {
+ private long currentMaxTimestamp;
- public String getTopic() {
- return topic;
+ @Nullable
+ @Override
+ public Watermark getCurrentWatermark() {
+ return new Watermark(currentMaxTimestamp);
}
- public void setTopic(String topic) {
- this.topic = topic;
+ @Override
+ public long extractTimestamp(MetricData metricData, long previousElementTimestamp) {
+ long timestamp = metricData.getTimestampMillis();
+ currentMaxTimestamp = Math.max(timestamp, previousElementTimestamp);
+ return timestamp;
}
}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/metrics/MetricsLogSink.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/metrics/MetricsLogSink.java
new file mode 100644
index 0000000..5b4b5f0
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/metrics/MetricsLogSink.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.metrics;
+
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetricsLogSink extends RichSinkFunction<MetricData> implements CheckpointedFunction {
+
+ private static final long serialVersionUID = -2741257263371632560L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(MetricsLogSink.class);
+
+ private transient SimpleDateFormat partitionSecondDateFormat;
+
+ private transient SimpleDateFormat partitionDayDateFormat;
+
+ public MetricsLogSink() {
+ }
+
+ @Override
+ public void open(Configuration configuration) throws Exception {
+ partitionSecondDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ partitionDayDateFormat = new SimpleDateFormat("yyyyMMdd");
+ }
+
+ @Override
+ public void invoke(MetricData metricData, Context context) throws Exception {
+ Timestamp timestamp = new Timestamp(metricData.getTimestampMillis());
+ LOG.info("Record metric, partition_day = '{}', partition_second = '{}', metric_source = '{}',"
+ + " metric_type = '{}', attachment = '{}', dataflow_id = '{}', user_partition = '{}', count = '{}'",
+ Long.valueOf(partitionDayDateFormat.format(timestamp)),
+ partitionSecondDateFormat.format(timestamp),
+ metricData.getMetricSource(),
+ metricData.getMetricType(),
+ metricData.getAttachment(),
+ metricData.getDataFlowId(),
+ metricData.getPartitions(),
+ metricData.getCount());
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+ }
+}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/transformation/FieldMappingTransformer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/transformation/FieldMappingTransformer.java
index e4485c2..c4414ae 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/transformation/FieldMappingTransformer.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/transformation/FieldMappingTransformer.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sort.flink.transformation;
+import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.types.Row;
@@ -26,6 +27,8 @@ import org.apache.inlong.sort.protocol.DataFlowInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.sink.SinkInfo;
import org.apache.inlong.sort.protocol.source.SourceInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.inlong.sort.configuration.Constants.DATA_TIME_FIELD;
@@ -34,6 +37,8 @@ import static org.apache.inlong.sort.configuration.Constants.DATA_TIME_FIELD;
*/
public class FieldMappingTransformer implements DataFlowInfoListener {
+ private static final Logger LOG = LoggerFactory.getLogger(FieldMappingTransformer.class);
+
/**
* Skips time and attribute fields of source record.
*
@@ -71,7 +76,15 @@ public class FieldMappingTransformer implements DataFlowInfoListener {
sinkRow.setField(i, sourceRow.getField(fieldIndex));
}
}
- return new Record(sourceRecord.getDataflowId(), sinkRow);
+
+ long recordTimestampMillis;
+ try {
+ recordTimestampMillis = ((Timestamp) sourceRow.getField(0)).getTime();
+ } catch (Exception e) {
+ recordTimestampMillis = System.currentTimeMillis();
+ LOG.warn("Failed to extract timestamp from data.", e);
+ }
+ return new Record(sourceRecord.getDataflowId(), recordTimestampMillis, sinkRow);
}
/**
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/transformation/RecordTransformer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/transformation/RecordTransformer.java
index ba0b36e..6facf74 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/transformation/RecordTransformer.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/transformation/RecordTransformer.java
@@ -78,6 +78,7 @@ public class RecordTransformer implements DataFlowInfoListener {
try {
rowSerializer.serialize(newRecord.getRow(), dataOutputSerializer);
serializedRecord = new SerializedRecord(dataFlowId,
+ record.getTimestampMillis(),
dataOutputSerializer.getCopyOfBuffer());
dataOutputSerializer.pruneBuffer();
} catch (Exception e) {
@@ -101,7 +102,7 @@ public class RecordTransformer implements DataFlowInfoListener {
} finally {
dataInputDeserializer.releaseArrays();
}
- return new Record(dataFlowId, row);
+ return new Record(dataFlowId, serializedRecord.getTimestampMillis(), row);
}
private RowSerializer getRowSerializer(long dataFlowId) throws Exception {
@@ -127,7 +128,7 @@ public class RecordTransformer implements DataFlowInfoListener {
newRow.setField(i, null);
}
}
- return new Record(oldRecord.getDataflowId(), newRow);
+ return new Record(oldRecord.getDataflowId(), oldRecord.getTimestampMillis(), newRow);
}
@VisibleForTesting
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java
index 09b13c0..028a89a 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java
@@ -396,7 +396,8 @@ public class MultiTenancyTubeConsumer {
synchronized (context.getCheckpointLock()) {
for (Message message : consumeResult.getMessageList()) {
// TODO, optimize for single tid or no tid topic
- context.collect(new TDMsgSerializedRecord(topic, message.getData()));
+ context.collect(new TDMsgSerializedRecord(
+ topic, System.currentTimeMillis(), message.getData()));
}
final String partitionKey = consumeResult.getPartitionKey();
final long offset = consumeResult.getCurrOffset();
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java
index 99a0d89..397544e 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/deserialization/MultiTenancyTDMsgMixedDeserializerTest.java
@@ -84,7 +84,7 @@ public class MultiTenancyTDMsgMixedDeserializerTest extends TestLogger {
tdMsg1.addMsg(attrs, body1.getBytes());
final TestingCollector<Record> collector = new TestingCollector<>();
- deserializer.deserialize(new TDMsgSerializedRecord("topic", tdMsg1.buildArray()), collector);
+ deserializer.deserialize(new TDMsgSerializedRecord("topic", 0, tdMsg1.buildArray()), collector);
assertEquals(1, collector.results.size());
assertEquals(1L, collector.results.get(0).getDataflowId());
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/hive/HiveSinkITCase.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/hive/HiveSinkITCase.java
index dd6f7dd..1d60913 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/hive/HiveSinkITCase.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/hive/HiveSinkITCase.java
@@ -380,7 +380,8 @@ public class HiveSinkITCase extends TestLogger {
row.setField(1, fieldValue2);
row.setField(2, line[0]);
row.setField(3, line[1]);
- sourceContext.collect(transformer.toSerializedRecord(new Record(dataFlowInfo.getId(), row)));
+ sourceContext.collect(transformer.toSerializedRecord(
+ new Record(dataFlowInfo.getId(), System.currentTimeMillis(), row)));
}
verificationFinishedLatch.await();
}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgSerializedRecord.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/metrics/MetricDataTest.java
similarity index 51%
copy from inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgSerializedRecord.java
copy to inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/metrics/MetricDataTest.java
index 3928d76..0744559 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/TDMsgSerializedRecord.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/metrics/MetricDataTest.java
@@ -15,36 +15,18 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.flink;
-
-import static org.apache.inlong.sort.configuration.Constants.UNKNOWN_DATAFLOW_ID;
-
-/**
- * Data flow id might not been got from mixed TDMsg data stream.
- */
-public class TDMsgSerializedRecord extends SerializedRecord {
-
- private static final long serialVersionUID = 4075321919886376829L;
-
- private String topic;
-
- /**
- * Just satisfy requirement of Flink Pojo definition.
- */
- public TDMsgSerializedRecord() {
- super();
- }
-
- public TDMsgSerializedRecord(String topic, byte[] data) {
- super(UNKNOWN_DATAFLOW_ID, data);
- this.topic = topic;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
+package org.apache.inlong.sort.flink.metrics;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MetricDataTest {
+ @Test
+ public void testPOJO() {
+ Assert.assertEquals(
+ "org.apache.flink.api.java.typeutils.runtime.PojoSerializer",
+ TypeInformation.of(MetricData.class).createSerializer(new ExecutionConfig()).getClass().getName());
}
}
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/metrics/MetricsLogSinkTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/metrics/MetricsLogSinkTest.java
new file mode 100644
index 0000000..827c5eb
--- /dev/null
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/metrics/MetricsLogSinkTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.metrics;
+
+import org.apache.flink.streaming.api.functions.sink.SinkFunction.Context;
+import org.apache.inlong.sort.flink.metrics.MetricData.MetricSource;
+import org.apache.inlong.sort.flink.metrics.MetricData.MetricType;
+import org.junit.Test;
+
+public class MetricsLogSinkTest {
+ @Test
+ public void test() throws Exception {
+ final org.apache.flink.configuration.Configuration flinkConfig
+ = new org.apache.flink.configuration.Configuration();
+ MetricsLogSink metricsLogSink = new MetricsLogSink();
+ metricsLogSink.open(flinkConfig);
+
+ MetricData metricData = new MetricData(
+ MetricSource.SOURCE,
+ MetricType.SUCCESSFUL,
+ 0,
+ 1,
+ "attach",
+ 1
+ );
+ metricsLogSink.invoke(metricData, new Context() {
+ @Override
+ public long currentProcessingTime() {
+ return 0;
+ }
+
+ @Override
+ public long currentWatermark() {
+ return 0;
+ }
+
+ @Override
+ public Long timestamp() {
+ return null;
+ }
+ });
+ }
+}
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/transformation/FieldMappingTransformerTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/transformation/FieldMappingTransformerTest.java
index 43aab2d..6e37aae 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/transformation/FieldMappingTransformerTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/transformation/FieldMappingTransformerTest.java
@@ -56,7 +56,7 @@ public class FieldMappingTransformerTest extends TestLogger {
sourceRow.setField(1, "attr");
sourceRow.setField(2, "not important");
sourceRow.setField(3, 9527L);
- final Record sourceRecord = new Record(dataFlowId, sourceRow);
+ final Record sourceRecord = new Record(dataFlowId, System.currentTimeMillis(), sourceRow);
final Record sinkRecord = transformer.transform(sourceRecord);
assertEquals(dataFlowId, sinkRecord.getDataflowId());
assertEquals(1, sinkRecord.getRow().getArity());
@@ -81,7 +81,7 @@ public class FieldMappingTransformerTest extends TestLogger {
sourceRow.setField(0, dt);
sourceRow.setField(1, "attr");
sourceRow.setField(2, 9527L);
- final Record sourceRecord = new Record(dataFlowId, sourceRow);
+ final Record sourceRecord = new Record(dataFlowId, System.currentTimeMillis(), sourceRow);
final Record sinkRecord = transformer.transform(sourceRecord);
assertEquals(dataFlowId, sinkRecord.getDataflowId());
assertEquals(2, sinkRecord.getRow().getArity());
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/transformation/RecordTransformerTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/transformation/RecordTransformerTest.java
index f884e23..f1f9111 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/transformation/RecordTransformerTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/transformation/RecordTransformerTest.java
@@ -53,7 +53,7 @@ public class RecordTransformerTest extends TestLogger {
final Row row = new Row(2);
row.setField(0, 1024L);
row.setField(1, "9527");
- final Record record = new Record(1L, row);
+ final Record record = new Record(1L, System.currentTimeMillis(), row);
final Record transformed = transformer.toRecord(transformer.toSerializedRecord(record));
assertEquals(record, transformed);
// check the buffers
@@ -81,7 +81,7 @@ public class RecordTransformerTest extends TestLogger {
final Row row = new Row(2);
row.setField(0, 1024L);
row.setField(1, "9527");
- final Record record = new Record(1L, row);
+ final Record record = new Record(1L, System.currentTimeMillis(), row);
assertSame(record, transformer.matchRecordAndSerializerField(record, rowSerializers.get(1L)));
}
@@ -101,7 +101,7 @@ public class RecordTransformerTest extends TestLogger {
final Row oneFieldRow = new Row(1);
oneFieldRow.setField(0, 1024L);
- final Record oneFieldRecord = new Record(1L, oneFieldRow);
+ final Record oneFieldRecord = new Record(1L, System.currentTimeMillis(), oneFieldRow);
assertEquals(2,
transformer.matchRecordAndSerializerField(oneFieldRecord, rowSerializers.get(1L)).getRow().getArity());
@@ -110,7 +110,7 @@ public class RecordTransformerTest extends TestLogger {
threeFieldRow.setField(0, 1024L);
threeFieldRow.setField(1, "9527");
threeFieldRow.setField(2, 2048);
- final Record threeFieldRecord = new Record(1L, threeFieldRow);
+ final Record threeFieldRecord = new Record(1L, System.currentTimeMillis(), threeFieldRow);
assertEquals(2,
transformer.matchRecordAndSerializerField(threeFieldRecord, rowSerializers.get(1L)).getRow()
@@ -123,7 +123,7 @@ public class RecordTransformerTest extends TestLogger {
final Row row = new Row(2);
row.setField(0, 1024L);
row.setField(1, "9527");
- final Record record = new Record(1L, row);
+ final Record record = new Record(1L, System.currentTimeMillis(), row);
final int bufferSize = 1024;
final RecordTransformer transformer = new RecordTransformer(bufferSize);
@@ -165,7 +165,7 @@ public class RecordTransformerTest extends TestLogger {
final Row row = new Row(2);
row.setField(0, 1024L);
row.setField(1, 2048);
- final Record record = new Record(1L, row);
+ final Record record = new Record(1L, System.currentTimeMillis(), row);
try {
transformer.toSerializedRecord(record);