You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2021/10/19 11:56:35 UTC
[iceberg] branch master updated: Flink: Fix CDC validation errors
(#3258)
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 0abaa7c Flink: Fix CDC validation errors (#3258)
0abaa7c is described below
commit 0abaa7ce670f74493cdc5aad43ec56c25d4a54d3
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Tue Oct 19 04:56:26 2021 -0700
Flink: Fix CDC validation errors (#3258)
---
.../iceberg/flink/sink/IcebergFilesCommitter.java | 12 +++--
.../flink/sink/TestIcebergFilesCommitter.java | 60 ----------------------
2 files changed, 8 insertions(+), 64 deletions(-)
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index ff9174a..010df8c 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -50,7 +50,6 @@ import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Comparators;
@@ -282,9 +281,14 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
// txn2, the equality-delete files of txn2 are required to be applied to data files from txn1. Committing the
// merged one will lead to the incorrect delete semantic.
WriteResult result = e.getValue();
- RowDelta rowDelta = table.newRowDelta()
- .validateDataFilesExist(ImmutableList.copyOf(result.referencedDataFiles()))
- .validateDeletedFiles();
+
+ // Row delta validations are not needed for streaming changes that write equality deletes. Equality deletes
+ // are applied to data in all previous sequence numbers, so retries may push deletes further in the future,
+ // but do not affect correctness. Position deletes committed to the table in this path are used only to delete
+ // rows from data files that are being added in this commit. There is no way for data files added along with
+ // the delete files to be concurrently removed, so there is no need to validate the files referenced by the
+ // position delete files that are being committed.
+ RowDelta rowDelta = table.newRowDelta();
int numDataFiles = result.dataFiles().length;
Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
index 34785cf..8a94bee 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
@@ -680,66 +680,6 @@ public class TestIcebergFilesCommitter extends TableTestBase {
}
@Test
- public void testValidateDataFileExist() throws Exception {
- Assume.assumeFalse("Only support equality-delete in format v2.", formatVersion < 2);
- long timestamp = 0;
- long checkpoint = 10;
- JobID jobId = new JobID();
- FileAppenderFactory<RowData> appenderFactory = createDeletableAppenderFactory();
-
- RowData insert1 = SimpleDataUtil.createInsert(1, "aaa");
- DataFile dataFile1 = writeDataFile("data-file-1", ImmutableList.of(insert1));
-
- try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobId)) {
- harness.setup();
- harness.open();
-
- // Txn#1: insert the row <1, 'aaa'>
- harness.processElement(WriteResult.builder()
- .addDataFiles(dataFile1)
- .build(),
- ++timestamp);
- harness.snapshot(checkpoint, ++timestamp);
- harness.notifyOfCompletedCheckpoint(checkpoint);
-
- // Txn#2: Overwrite the committed data-file-1
- RowData insert2 = SimpleDataUtil.createInsert(2, "bbb");
- DataFile dataFile2 = writeDataFile("data-file-2", ImmutableList.of(insert2));
- new TestTableLoader(tablePath)
- .loadTable()
- .newOverwrite()
- .addFile(dataFile2)
- .deleteFile(dataFile1)
- .commit();
- }
-
- try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobId)) {
- harness.setup();
- harness.open();
-
- // Txn#3: position-delete the <1, 'aaa'> (NOT committed).
- DeleteFile deleteFile1 = writePosDeleteFile(appenderFactory,
- "pos-delete-file-1",
- ImmutableList.of(Pair.of(dataFile1.path(), 0L)));
- harness.processElement(WriteResult.builder()
- .addDeleteFiles(deleteFile1)
- .addReferencedDataFiles(dataFile1.path())
- .build(),
- ++timestamp);
- harness.snapshot(++checkpoint, ++timestamp);
-
- // Txn#3: validate will be failure when committing.
- final long currentCheckpointId = checkpoint;
- AssertHelpers.assertThrows("Validation should be failure because of non-exist data files.",
- ValidationException.class, "Cannot commit, missing data files",
- () -> {
- harness.notifyOfCompletedCheckpoint(currentCheckpointId);
- return null;
- });
- }
- }
-
- @Test
public void testCommitTwoCheckpointsInSingleTxn() throws Exception {
Assume.assumeFalse("Only support equality-delete in format v2.", formatVersion < 2);