You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/10/12 06:21:09 UTC

[inlong] branch master updated: [INLONG-6149][Sort] Iceberg delete key cause ArrayIndexOutOfBoundsException (#6150)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fa25ee3b [INLONG-6149][Sort] Iceberg delete key cause ArrayIndexOutOfBoundsException (#6150)
4fa25ee3b is described below

commit 4fa25ee3b946cc5aa678d918a5cb8c27397d299b
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Wed Oct 12 14:21:03 2022 +0800

    [INLONG-6149][Sort] Iceberg delete key cause ArrayIndexOutOfBoundsException (#6150)
---
 .../sort/iceberg/flink/sink/RowDataTaskWriterFactory.java      | 10 +++++++++-
 .../inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java     | 10 +++++++++-
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java
index 36240760f..84aaf2af1 100644
--- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -36,6 +36,8 @@ import org.apache.iceberg.io.PartitionedFanoutWriter;
 import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.UnpartitionedWriter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.ArrayUtil;
 
 import java.util.List;
@@ -79,8 +81,14 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
 
         if (equalityFieldIds == null || equalityFieldIds.isEmpty() || appendMode) {
             this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec);
+        } else if (upsert) {
+            // In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of the inserted
+            // row may differ from the deleted row other than the primary key fields, and the delete file must contain
+            // values that are correct for the deleted row. Therefore, only write the equality delete fields.
+            this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
+                    ArrayUtil.toIntArray(equalityFieldIds),
+                    TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), null);
         } else {
-            // TODO provide the ability to customize the equality-delete row schema.
             this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
                     ArrayUtil.toIntArray(equalityFieldIds), schema, null);
         }
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
index 831ef6a4a..aa724ac7f 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
@@ -36,6 +36,8 @@ import org.apache.iceberg.io.PartitionedFanoutWriter;
 import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.UnpartitionedWriter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.ArrayUtil;
 
 import java.util.List;
@@ -79,8 +81,14 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
 
         if (equalityFieldIds == null || equalityFieldIds.isEmpty() || appendMode) {
             this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec);
+        } else if (upsert) {
+            // In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of the inserted
+            // row may differ from the deleted row other than the primary key fields, and the delete file must contain
+            // values that are correct for the deleted row. Therefore, only write the equality delete fields.
+            this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
+                    ArrayUtil.toIntArray(equalityFieldIds),
+                    TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), null);
         } else {
-            // TODO provide the ability to customize the equality-delete row schema.
             this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
                     ArrayUtil.toIntArray(equalityFieldIds), schema, null);
         }