You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by st...@apache.org on 2023/02/28 02:19:04 UTC

[iceberg] branch master updated: Flink: fix wrong projection when delete records in upsert mode (#6753)

This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 353c7989c8 Flink: fix wrong projection when delete records in upsert mode (#6753)
353c7989c8 is described below

commit 353c7989c891c73e866f89a23bce0e140ab8bb2e
Author: Chen, Junjie <ji...@tencent.com>
AuthorDate: Tue Feb 28 10:18:56 2023 +0800

    Flink: fix wrong projection when delete records in upsert mode (#6753)
---
 .../apache/iceberg/flink/sink/BaseDeltaTaskWriter.java |  6 +++++-
 .../iceberg/flink/sink/TestFlinkIcebergSinkV2.java     | 18 ++++++++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index b8786f259a..b1ffda1560 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -94,7 +94,11 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
         writer.delete(row);
         break;
       case DELETE:
-        writer.delete(row);
+        if (upsert) {
+          writer.deleteKey(keyProjection.wrap(row));
+        } else {
+          writer.delete(row);
+        }
         break;
 
       default:
diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index 7733eeb4a3..b28af0706b 100644
--- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -155,6 +155,24 @@ public class TestFlinkIcebergSinkV2 extends TestFlinkIcebergSinkV2Base {
     testChangeLogOnIdKey(SnapshotRef.MAIN_BRANCH);
   }
 
+  @Test
+  public void testUpsertOnlyDeletesOnDataKey() throws Exception {
+    List<List<Row>> elementsPerCheckpoint =
+        ImmutableList.of(
+            ImmutableList.of(row("+I", 1, "aaa")),
+            ImmutableList.of(row("-D", 1, "aaa"), row("-D", 2, "bbb")));
+
+    List<List<Record>> expectedRecords =
+        ImmutableList.of(ImmutableList.of(record(1, "aaa")), ImmutableList.of());
+
+    testChangeLogs(
+        ImmutableList.of("data"),
+        row -> row.getField(ROW_DATA_POS),
+        true,
+        elementsPerCheckpoint,
+        expectedRecords);
+  }
+
   @Test
   public void testChangeLogOnDataKey() throws Exception {
     testChangeLogOnDataKey(SnapshotRef.MAIN_BRANCH);