You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/09/27 02:16:58 UTC
[inlong] branch release-1.3.0 updated: [INLONG-6027][Sort] Dlc did not restore metric data successfully (#6028)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new 6323e541e [INLONG-6027][Sort] Dlc did not restore metric data successfully (#6028)
6323e541e is described below
commit 6323e541e72b2257c44a4cf89bd96a9c95dce1ab
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Mon Sep 26 22:39:48 2022 +0800
[INLONG-6027][Sort] Dlc did not restore metric data successfully (#6028)
---
.../apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
index ef7612743..6096a4232 100644
--- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
@@ -56,12 +56,12 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
private final String fullTableName;
private final TaskWriterFactory<T> taskWriterFactory;
+ @Nullable
private final MetricOption metricOption;
private transient TaskWriter<T> writer;
private transient int subTaskId;
private transient int attemptId;
- @Nullable
private transient SinkMetricData metricData;
private transient ListState<MetricState> metricStateListState;
private transient MetricState metricState;
@@ -115,7 +115,7 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
// init metric state
- if (this.metricData != null) {
+ if (this.metricOption != null) {
this.metricStateListState = context.getOperatorStateStore().getUnionListState(
new ListStateDescriptor<>(
INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {