You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/10/12 06:21:09 UTC
[inlong] branch master updated: [INLONG-6149][Sort] Iceberg delete key cause ArrayIndexOutOfBoundsException (#6150)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4fa25ee3b [INLONG-6149][Sort] Iceberg delete key cause ArrayIndexOutOfBoundsException (#6150)
4fa25ee3b is described below
commit 4fa25ee3b946cc5aa678d918a5cb8c27397d299b
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Wed Oct 12 14:21:03 2022 +0800
[INLONG-6149][Sort] Iceberg delete key cause ArrayIndexOutOfBoundsException (#6150)
---
.../sort/iceberg/flink/sink/RowDataTaskWriterFactory.java | 10 +++++++++-
.../inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java | 10 +++++++++-
2 files changed, 18 insertions(+), 2 deletions(-)
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java
index 36240760f..84aaf2af1 100644
--- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -36,6 +36,8 @@ import org.apache.iceberg.io.PartitionedFanoutWriter;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.ArrayUtil;
import java.util.List;
@@ -79,8 +81,14 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
if (equalityFieldIds == null || equalityFieldIds.isEmpty() || appendMode) {
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec);
+ } else if (upsert) {
+ // In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of the inserted
+ // row may differ from the deleted row other than the primary key fields, and the delete file must contain
+ // values that are correct for the deleted row. Therefore, only write the equality delete fields.
+ this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
+ ArrayUtil.toIntArray(equalityFieldIds),
+ TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), null);
} else {
- // TODO provide the ability to customize the equality-delete row schema.
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
ArrayUtil.toIntArray(equalityFieldIds), schema, null);
}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
index 831ef6a4a..aa724ac7f 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
@@ -36,6 +36,8 @@ import org.apache.iceberg.io.PartitionedFanoutWriter;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.ArrayUtil;
import java.util.List;
@@ -79,8 +81,14 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
if (equalityFieldIds == null || equalityFieldIds.isEmpty() || appendMode) {
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec);
+ } else if (upsert) {
+ // In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of the inserted
+ // row may differ from the deleted row other than the primary key fields, and the delete file must contain
+ // values that are correct for the deleted row. Therefore, only write the equality delete fields.
+ this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
+ ArrayUtil.toIntArray(equalityFieldIds),
+ TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), null);
} else {
- // TODO provide the ability to customize the equality-delete row schema.
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
ArrayUtil.toIntArray(equalityFieldIds), schema, null);
}