You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/21 09:25:47 UTC
[inlong] 01/05: [INLONG-5969][Sort] Support metrics state restore for hive connector (#5972)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 632a0261b0629692430b1d9d7e42a7385910e6e3
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Wed Sep 21 16:02:06 2022 +0800
[INLONG-5969][Sort] Support metrics state restore for hive connector (#5972)
Co-authored-by: thesumery <15...@qq.com>
---
.../hive/filesystem/AbstractStreamingWriter.java | 31 ++++++++++++++++++++++
1 file changed, 31 insertions(+)
diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
index ab2e845f2..dd9456203 100644
--- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
+++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
@@ -18,6 +18,10 @@
package org.apache.inlong.sort.hive.filesystem;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -34,9 +38,16 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
+
import javax.annotation.Nullable;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+
/**
* Operator for file system sink. It is a operator version of {@link StreamingFileSink}. It can send
* file and bucket information to downstream.
@@ -70,6 +81,8 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
@Nullable
private transient SinkMetricData metricData;
+ private transient ListState<MetricState> metricStateListState;
+ private transient MetricState metricState;
public AbstractStreamingWriter(
long bucketCheckInterval,
@@ -113,6 +126,8 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
.withInlongAudit(auditHostAndPorts)
+ .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+ .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
.withRegisterMetric(RegisteredMetric.ALL)
.build();
if (metricOption != null) {
@@ -151,12 +166,28 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
bucketCheckInterval);
currentWatermark = Long.MIN_VALUE;
+
+ // init metric state
+ if (this.inlongMetric != null) {
+ this.metricStateListState = context.getOperatorStateStore().getUnionListState(
+ new ListStateDescriptor<>(
+ INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+ })));
+ }
+ if (context.isRestored()) {
+ metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+ getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+ }
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
helper.snapshotState(context.getCheckpointId());
+ if (metricData != null && metricStateListState != null) {
+ MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData,
+ getRuntimeContext().getIndexOfThisSubtask());
+ }
}
@Override