You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/08 02:27:23 UTC

[inlong] branch master updated: [INLONG-7546][Sort] Fix dirty data not archived for iceberg connector (#7547)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c136fa908 [INLONG-7546][Sort] Fix dirty data not archived for iceberg connector (#7547)
c136fa908 is described below

commit c136fa908b81b45879e11e69df4debb88266032a
Author: LinChen <lo...@gmail.com>
AuthorDate: Wed Mar 8 10:27:18 2023 +0800

    [INLONG-7546][Sort] Fix dirty data not archived for iceberg connector (#7547)
---
 .../sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java   | 8 ++++++++
 .../sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java     | 8 ++++++++
 2 files changed, 16 insertions(+)

diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
index db784c14b..7d2973a3d 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
@@ -147,6 +147,14 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi
             sinkMetricData = new SinkTableMetricData(metricOption, runtimeContext.getMetricGroup());
             sinkMetricData.registerSubMetricsGroup(metricState);
         }
+
+        if (dirtySink != null) {
+            try {
+                dirtySink.open(new Configuration());
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
     }
 
     @Override
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
index a2117daf1..502c47008 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
@@ -123,6 +123,14 @@ public class IcebergSingleStreamWriter<T> extends IcebergProcessFunction<T, Writ
             if (metricOption != null) {
                 metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
             }
+
+            if (dirtySink != null) {
+                try {
+                    dirtySink.open(new Configuration());
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
         }
     }