You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/10/29 21:06:09 UTC

[iceberg] 07/09: Flink: Fix CDC validation errors (#3258)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch 0.12.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 9975c2bb6abac363a781ba49f80554669d7f67c4
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Tue Oct 19 04:56:26 2021 -0700

    Flink: Fix CDC validation errors (#3258)
---
 .../iceberg/flink/sink/IcebergFilesCommitter.java  | 12 +++--
 .../flink/sink/TestIcebergFilesCommitter.java      | 60 ----------------------
 2 files changed, 8 insertions(+), 64 deletions(-)

diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index ff9174a..010df8c 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -50,7 +50,6 @@ import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.base.Strings;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Comparators;
@@ -282,9 +281,14 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
         // txn2, the equality-delete files of txn2 are required to be applied to data files from txn1. Committing the
         // merged one will lead to the incorrect delete semantic.
         WriteResult result = e.getValue();
-        RowDelta rowDelta = table.newRowDelta()
-            .validateDataFilesExist(ImmutableList.copyOf(result.referencedDataFiles()))
-            .validateDeletedFiles();
+
+        // Row delta validations are not needed for streaming changes that write equality deletes. Equality deletes
+        // are applied to data in all previous sequence numbers, so retries may push deletes further in the future,
+        // but do not affect correctness. Position deletes committed to the table in this path are used only to delete
+        // rows from data files that are being added in this commit. There is no way for data files added along with
+        // the delete files to be concurrently removed, so there is no need to validate the files referenced by the
+        // position delete files that are being committed.
+        RowDelta rowDelta = table.newRowDelta();
 
         int numDataFiles = result.dataFiles().length;
         Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
index 34785cf..8a94bee 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
@@ -680,66 +680,6 @@ public class TestIcebergFilesCommitter extends TableTestBase {
   }
 
   @Test
-  public void testValidateDataFileExist() throws Exception {
-    Assume.assumeFalse("Only support equality-delete in format v2.", formatVersion < 2);
-    long timestamp = 0;
-    long checkpoint = 10;
-    JobID jobId = new JobID();
-    FileAppenderFactory<RowData> appenderFactory = createDeletableAppenderFactory();
-
-    RowData insert1 = SimpleDataUtil.createInsert(1, "aaa");
-    DataFile dataFile1 = writeDataFile("data-file-1", ImmutableList.of(insert1));
-
-    try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobId)) {
-      harness.setup();
-      harness.open();
-
-      // Txn#1: insert the row <1, 'aaa'>
-      harness.processElement(WriteResult.builder()
-              .addDataFiles(dataFile1)
-              .build(),
-          ++timestamp);
-      harness.snapshot(checkpoint, ++timestamp);
-      harness.notifyOfCompletedCheckpoint(checkpoint);
-
-      // Txn#2: Overwrite the committed data-file-1
-      RowData insert2 = SimpleDataUtil.createInsert(2, "bbb");
-      DataFile dataFile2 = writeDataFile("data-file-2", ImmutableList.of(insert2));
-      new TestTableLoader(tablePath)
-          .loadTable()
-          .newOverwrite()
-          .addFile(dataFile2)
-          .deleteFile(dataFile1)
-          .commit();
-    }
-
-    try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobId)) {
-      harness.setup();
-      harness.open();
-
-      // Txn#3: position-delete the <1, 'aaa'> (NOT committed).
-      DeleteFile deleteFile1 = writePosDeleteFile(appenderFactory,
-          "pos-delete-file-1",
-          ImmutableList.of(Pair.of(dataFile1.path(), 0L)));
-      harness.processElement(WriteResult.builder()
-              .addDeleteFiles(deleteFile1)
-              .addReferencedDataFiles(dataFile1.path())
-              .build(),
-          ++timestamp);
-      harness.snapshot(++checkpoint, ++timestamp);
-
-      // Txn#3: validate will be failure when committing.
-      final long currentCheckpointId = checkpoint;
-      AssertHelpers.assertThrows("Validation should be failure because of non-exist data files.",
-          ValidationException.class, "Cannot commit, missing data files",
-          () -> {
-            harness.notifyOfCompletedCheckpoint(currentCheckpointId);
-            return null;
-          });
-    }
-  }
-
-  @Test
   public void testCommitTwoCheckpointsInSingleTxn() throws Exception {
     Assume.assumeFalse("Only support equality-delete in format v2.", formatVersion < 2);