You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/09/27 02:16:58 UTC

[inlong] branch release-1.3.0 updated: [INLONG-6027][Sort] Dlc did not restore metric data successfully (#6028)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
     new 6323e541e [INLONG-6027][Sort] Dlc did not restore metric data successfully (#6028)
6323e541e is described below

commit 6323e541e72b2257c44a4cf89bd96a9c95dce1ab
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Mon Sep 26 22:39:48 2022 +0800

    [INLONG-6027][Sort] Dlc did not restore metric data successfully (#6028)
---
 .../apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
index ef7612743..6096a4232 100644
--- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
@@ -56,12 +56,12 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
 
     private final String fullTableName;
     private final TaskWriterFactory<T> taskWriterFactory;
+    @Nullable
     private final MetricOption metricOption;
 
     private transient TaskWriter<T> writer;
     private transient int subTaskId;
     private transient int attemptId;
-    @Nullable
     private transient SinkMetricData metricData;
     private transient ListState<MetricState> metricStateListState;
     private transient MetricState metricState;
@@ -115,7 +115,7 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
     public void initializeState(StateInitializationContext context) throws Exception {
         super.initializeState(context);
         // init metric state
-        if (this.metricData != null) {
+        if (this.metricOption != null) {
             this.metricStateListState = context.getOperatorStateStore().getUnionListState(
                     new ListStateDescriptor<>(
                             INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {