You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/21 10:57:23 UTC

[inlong] 01/02: [INLONG-5959][Sort] Support metric state recovery for filesystem (#5961)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit d32537802972d454a81808994947f632b611441d
Author: Charles <44...@users.noreply.github.com>
AuthorDate: Tue Sep 20 17:42:03 2022 +0800

    [INLONG-5959][Sort] Support metric state recovery for filesystem (#5961)
---
 .../filesystem/stream/AbstractStreamingWriter.java | 37 +++++++++++++++++++---
 1 file changed, 33 insertions(+), 4 deletions(-)

diff --git a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
index 95267c698..9edcc82b2 100644
--- a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
+++ b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
@@ -18,6 +18,10 @@
 
 package org.apache.inlong.sort.filesystem.stream;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -34,7 +38,13 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
+
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
 
 /**
  * Operator for file system sink. It is a operator version of {@link StreamingFileSink}. It can send
@@ -53,9 +63,12 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
             IN, String, ? extends StreamingFileSink.BucketsBuilder<IN, String, ?>>
             bucketsBuilder;
 
-    private String inlongMetric;
+    private final String inlongMetric;
+    private final String inlongAudit;
 
-    private String inlongAudit;
+    private transient ListState<MetricState> metricStateListState;
+    private transient MetricState metricState;
+    private SinkMetricData sinkMetricData;
 
     // --------------------------- runtime fields -----------------------------
 
@@ -103,11 +116,13 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
         super.open();
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
-                .withRegisterMetric(RegisteredMetric.ALL)
                 .withInlongAudit(inlongAudit)
+                .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+                .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+                .withRegisterMetric(RegisteredMetric.ALL)
                 .build();
         if (metricOption != null) {
-            metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
+            sinkMetricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
         }
     }
 
@@ -149,12 +164,26 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
                         bucketCheckInterval);
 
         currentWatermark = Long.MIN_VALUE;
+        if (this.inlongMetric != null) {
+            this.metricStateListState = context.getOperatorStateStore().getUnionListState(
+                    new ListStateDescriptor<>(
+                            INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+                    })));
+        }
+        if (context.isRestored()) {
+            metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+                    getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+        }
     }
 
     @Override
     public void snapshotState(StateSnapshotContext context) throws Exception {
         super.snapshotState(context);
         helper.snapshotState(context.getCheckpointId());
+        if (sinkMetricData != null && metricStateListState != null) {
+            MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, sinkMetricData,
+                    getRuntimeContext().getIndexOfThisSubtask());
+        }
     }
 
     @Override