You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by st...@apache.org on 2023/02/28 02:19:04 UTC
[iceberg] branch master updated: Flink: fix wrong projection when delete records in upsert mode (#6753)
This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 353c7989c8 Flink: fix wrong projection when delete records in upsert mode (#6753)
353c7989c8 is described below
commit 353c7989c891c73e866f89a23bce0e140ab8bb2e
Author: Chen, Junjie <ji...@tencent.com>
AuthorDate: Tue Feb 28 10:18:56 2023 +0800
Flink: fix wrong projection when delete records in upsert mode (#6753)
---
.../apache/iceberg/flink/sink/BaseDeltaTaskWriter.java | 6 +++++-
.../iceberg/flink/sink/TestFlinkIcebergSinkV2.java | 18 ++++++++++++++++++
2 files changed, 23 insertions(+), 1 deletion(-)
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index b8786f259a..b1ffda1560 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -94,7 +94,11 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
writer.delete(row);
break;
case DELETE:
- writer.delete(row);
+ if (upsert) {
+ writer.deleteKey(keyProjection.wrap(row));
+ } else {
+ writer.delete(row);
+ }
break;
default:
diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index 7733eeb4a3..b28af0706b 100644
--- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -155,6 +155,24 @@ public class TestFlinkIcebergSinkV2 extends TestFlinkIcebergSinkV2Base {
testChangeLogOnIdKey(SnapshotRef.MAIN_BRANCH);
}
+ @Test
+ public void testUpsertOnlyDeletesOnDataKey() throws Exception {
+ List<List<Row>> elementsPerCheckpoint =
+ ImmutableList.of(
+ ImmutableList.of(row("+I", 1, "aaa")),
+ ImmutableList.of(row("-D", 1, "aaa"), row("-D", 2, "bbb")));
+
+ List<List<Record>> expectedRecords =
+ ImmutableList.of(ImmutableList.of(record(1, "aaa")), ImmutableList.of());
+
+ testChangeLogs(
+ ImmutableList.of("data"),
+ row -> row.getField(ROW_DATA_POS),
+ true,
+ elementsPerCheckpoint,
+ expectedRecords);
+ }
+
@Test
public void testChangeLogOnDataKey() throws Exception {
testChangeLogOnDataKey(SnapshotRef.MAIN_BRANCH);