You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/21 10:57:23 UTC
[inlong] 01/02: [INLONG-5959][Sort] Support metric state recovery for filesystem (#5961)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit d32537802972d454a81808994947f632b611441d
Author: Charles <44...@users.noreply.github.com>
AuthorDate: Tue Sep 20 17:42:03 2022 +0800
[INLONG-5959][Sort] Support metric state recovery for filesystem (#5961)
---
.../filesystem/stream/AbstractStreamingWriter.java | 37 +++++++++++++++++++---
1 file changed, 33 insertions(+), 4 deletions(-)
diff --git a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
index 95267c698..9edcc82b2 100644
--- a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
+++ b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
@@ -18,6 +18,10 @@
package org.apache.inlong.sort.filesystem.stream;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -34,7 +38,13 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
+
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
/**
* Operator for file system sink. It is a operator version of {@link StreamingFileSink}. It can send
@@ -53,9 +63,12 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
IN, String, ? extends StreamingFileSink.BucketsBuilder<IN, String, ?>>
bucketsBuilder;
- private String inlongMetric;
+ private final String inlongMetric;
+ private final String inlongAudit;
- private String inlongAudit;
+ private transient ListState<MetricState> metricStateListState;
+ private transient MetricState metricState;
+ private SinkMetricData sinkMetricData;
// --------------------------- runtime fields -----------------------------
@@ -103,11 +116,13 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
super.open();
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
- .withRegisterMetric(RegisteredMetric.ALL)
.withInlongAudit(inlongAudit)
+ .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+ .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+ .withRegisterMetric(RegisteredMetric.ALL)
.build();
if (metricOption != null) {
- metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
+ sinkMetricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
}
}
@@ -149,12 +164,26 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
bucketCheckInterval);
currentWatermark = Long.MIN_VALUE;
+ if (this.inlongMetric != null) {
+ this.metricStateListState = context.getOperatorStateStore().getUnionListState(
+ new ListStateDescriptor<>(
+ INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+ })));
+ }
+ if (context.isRestored()) {
+ metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+ getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+ }
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
helper.snapshotState(context.getCheckpointId());
+ if (sinkMetricData != null && metricStateListState != null) {
+ MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, sinkMetricData,
+ getRuntimeContext().getIndexOfThisSubtask());
+ }
}
@Override