You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/12/16 09:45:03 UTC

[GitHub] [iceberg] openinx opened a new pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

openinx opened a new pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r545557808



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
##########
@@ -63,4 +66,53 @@ static ManifestOutputFileFactory createOutputFileFactory(Table table, String fli
     TableOperations ops = ((HasTableOperations) table).operations();
     return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, subTaskId, attemptNumber);
   }
+
+  static DeltaManifests writeCompletedFiles(WriteResult result,

Review comment:
       Thanks for the explanation, @rdblue .   I think it's correct to validate the data files in `RowDelta#commit`.  Will provide an extra unit test to address it. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r545471359



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -184,78 +185,106 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
     }
   }
 
-  private void commitUpToCheckpoint(NavigableMap<Long, byte[]> manifestsMap,
+  private void commitUpToCheckpoint(NavigableMap<Long, byte[]> deltaManifestsMap,
                                     String newFlinkJobId,
                                     long checkpointId) throws IOException {
-    NavigableMap<Long, byte[]> pendingManifestMap = manifestsMap.headMap(checkpointId, true);
+    NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);
 
-    List<ManifestFile> manifestFiles = Lists.newArrayList();
-    List<DataFile> pendingDataFiles = Lists.newArrayList();
-    for (byte[] manifestData : pendingManifestMap.values()) {
-      if (Arrays.equals(EMPTY_MANIFEST_DATA, manifestData)) {
+    List<ManifestFile> manifests = Lists.newArrayList();
+    NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();
+    for (Map.Entry<Long, byte[]> e : pendingMap.entrySet()) {
+      if (Arrays.equals(EMPTY_MANIFEST_DATA, e.getValue())) {
         // Skip the empty flink manifest.
         continue;
       }
 
-      ManifestFile manifestFile =
-          SimpleVersionedSerialization.readVersionAndDeSerialize(FlinkManifestSerializer.INSTANCE, manifestData);
-
-      manifestFiles.add(manifestFile);
-      pendingDataFiles.addAll(FlinkManifestUtil.readDataFiles(manifestFile, table.io()));
+      DeltaManifests deltaManifests = SimpleVersionedSerialization
+          .readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, e.getValue());
+      pendingResults.put(e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io()));
+      Iterables.addAll(manifests, deltaManifests);
     }
 
     if (replacePartitions) {
-      replacePartitions(pendingDataFiles, newFlinkJobId, checkpointId);
+      replacePartitions(pendingResults, newFlinkJobId, checkpointId);
     } else {
-      append(pendingDataFiles, newFlinkJobId, checkpointId);
+      commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId);
     }
 
-    pendingManifestMap.clear();
+    pendingMap.clear();
 
-    // Delete the committed manifests and clear the committed data files from dataFilesPerCheckpoint.
-    for (ManifestFile manifestFile : manifestFiles) {
+    // Delete the committed manifests.
+    for (ManifestFile manifest : manifests) {
       try {
-        table.io().deleteFile(manifestFile.path());
+        table.io().deleteFile(manifest.path());
       } catch (Exception e) {
         // The flink manifests cleaning failure shouldn't abort the completed checkpoint.
         String details = MoreObjects.toStringHelper(this)
             .add("flinkJobId", newFlinkJobId)
             .add("checkpointId", checkpointId)
-            .add("manifestPath", manifestFile.path())
+            .add("manifestPath", manifest.path())
             .toString();
         LOG.warn("The iceberg transaction has been committed, but we failed to clean the temporary flink manifests: {}",
             details, e);
       }
     }
   }
 
-  private void replacePartitions(List<DataFile> dataFiles, String newFlinkJobId, long checkpointId) {
+  private void replacePartitions(NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId,
+                                 long checkpointId) {
+    // Partition overwrite does not support delete files.
+    int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> r.deleteFiles().length).sum();
+    Preconditions.checkState(deleteFilesNum == 0, "Cannot overwrite partitions with delete files.");
+
+    // Commit the overwrite transaction.
     ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
 
     int numFiles = 0;
-    for (DataFile file : dataFiles) {
-      numFiles += 1;
-      dynamicOverwrite.addFile(file);
+    for (WriteResult result : pendingResults.values()) {
+      numFiles += result.dataFiles().length;
+      Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
     }
 
-    commitOperation(dynamicOverwrite, numFiles, "dynamic partition overwrite", newFlinkJobId, checkpointId);
+    commitOperation(dynamicOverwrite, numFiles, 0, "dynamic partition overwrite", newFlinkJobId, checkpointId);
   }
 
-  private void append(List<DataFile> dataFiles, String newFlinkJobId, long checkpointId) {
-    AppendFiles appendFiles = table.newAppend();
+  private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId, long checkpointId) {
+    int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> r.deleteFiles().length).sum();
 
-    int numFiles = 0;
-    for (DataFile file : dataFiles) {
-      numFiles += 1;
-      appendFiles.appendFile(file);
-    }
+    if (deleteFilesNum == 0) {
+      // To be compatible with iceberg format V1.
+      AppendFiles appendFiles = table.newAppend();
 
-    commitOperation(appendFiles, numFiles, "append", newFlinkJobId, checkpointId);
+      int numFiles = 0;
+      for (WriteResult result : pendingResults.values()) {

Review comment:
       here we are merging data files potentially from multiple checkpoint cycles/manifests into a single manifest file. 
   
   We are using this API from `AppendFiles` interface. 
   ```
     AppendFiles appendManifest(ManifestFile file);
   ```
   
   When we had an extended outage and accumulated a few hundreds of transactions/manifests in Flink checkpoint, this help avoiding rewrite of those manifest files. @rdblue can probably explain it better than I do.
   
   Maybe we can add a similar API in `DeleteFiles` interface?
   ```
     DeleteFiles deleteManifest(ManifestFile file);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r545471359



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -184,78 +185,106 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
     }
   }
 
-  private void commitUpToCheckpoint(NavigableMap<Long, byte[]> manifestsMap,
+  private void commitUpToCheckpoint(NavigableMap<Long, byte[]> deltaManifestsMap,
                                     String newFlinkJobId,
                                     long checkpointId) throws IOException {
-    NavigableMap<Long, byte[]> pendingManifestMap = manifestsMap.headMap(checkpointId, true);
+    NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);
 
-    List<ManifestFile> manifestFiles = Lists.newArrayList();
-    List<DataFile> pendingDataFiles = Lists.newArrayList();
-    for (byte[] manifestData : pendingManifestMap.values()) {
-      if (Arrays.equals(EMPTY_MANIFEST_DATA, manifestData)) {
+    List<ManifestFile> manifests = Lists.newArrayList();
+    NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();
+    for (Map.Entry<Long, byte[]> e : pendingMap.entrySet()) {
+      if (Arrays.equals(EMPTY_MANIFEST_DATA, e.getValue())) {
         // Skip the empty flink manifest.
         continue;
       }
 
-      ManifestFile manifestFile =
-          SimpleVersionedSerialization.readVersionAndDeSerialize(FlinkManifestSerializer.INSTANCE, manifestData);
-
-      manifestFiles.add(manifestFile);
-      pendingDataFiles.addAll(FlinkManifestUtil.readDataFiles(manifestFile, table.io()));
+      DeltaManifests deltaManifests = SimpleVersionedSerialization
+          .readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, e.getValue());
+      pendingResults.put(e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io()));
+      Iterables.addAll(manifests, deltaManifests);
     }
 
     if (replacePartitions) {
-      replacePartitions(pendingDataFiles, newFlinkJobId, checkpointId);
+      replacePartitions(pendingResults, newFlinkJobId, checkpointId);
     } else {
-      append(pendingDataFiles, newFlinkJobId, checkpointId);
+      commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId);
     }
 
-    pendingManifestMap.clear();
+    pendingMap.clear();
 
-    // Delete the committed manifests and clear the committed data files from dataFilesPerCheckpoint.
-    for (ManifestFile manifestFile : manifestFiles) {
+    // Delete the committed manifests.
+    for (ManifestFile manifest : manifests) {
       try {
-        table.io().deleteFile(manifestFile.path());
+        table.io().deleteFile(manifest.path());
       } catch (Exception e) {
         // The flink manifests cleaning failure shouldn't abort the completed checkpoint.
         String details = MoreObjects.toStringHelper(this)
             .add("flinkJobId", newFlinkJobId)
             .add("checkpointId", checkpointId)
-            .add("manifestPath", manifestFile.path())
+            .add("manifestPath", manifest.path())
             .toString();
         LOG.warn("The iceberg transaction has been committed, but we failed to clean the temporary flink manifests: {}",
             details, e);
       }
     }
   }
 
-  private void replacePartitions(List<DataFile> dataFiles, String newFlinkJobId, long checkpointId) {
+  private void replacePartitions(NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId,
+                                 long checkpointId) {
+    // Partition overwrite does not support delete files.
+    int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> r.deleteFiles().length).sum();
+    Preconditions.checkState(deleteFilesNum == 0, "Cannot overwrite partitions with delete files.");
+
+    // Commit the overwrite transaction.
     ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
 
     int numFiles = 0;
-    for (DataFile file : dataFiles) {
-      numFiles += 1;
-      dynamicOverwrite.addFile(file);
+    for (WriteResult result : pendingResults.values()) {
+      numFiles += result.dataFiles().length;
+      Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
     }
 
-    commitOperation(dynamicOverwrite, numFiles, "dynamic partition overwrite", newFlinkJobId, checkpointId);
+    commitOperation(dynamicOverwrite, numFiles, 0, "dynamic partition overwrite", newFlinkJobId, checkpointId);
   }
 
-  private void append(List<DataFile> dataFiles, String newFlinkJobId, long checkpointId) {
-    AppendFiles appendFiles = table.newAppend();
+  private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId, long checkpointId) {
+    int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> r.deleteFiles().length).sum();
 
-    int numFiles = 0;
-    for (DataFile file : dataFiles) {
-      numFiles += 1;
-      appendFiles.appendFile(file);
-    }
+    if (deleteFilesNum == 0) {
+      // To be compatible with iceberg format V1.
+      AppendFiles appendFiles = table.newAppend();
 
-    commitOperation(appendFiles, numFiles, "append", newFlinkJobId, checkpointId);
+      int numFiles = 0;
+      for (WriteResult result : pendingResults.values()) {

Review comment:
       here we are merging data files potentially from multiple checkpoint cycles/manifests into a single manifest file.
   
    We are using this API from `AppendFiles` interface. 
   ```
     AppendFiles appendManifest(ManifestFile file);
   ```
   
   When we had an extended outage and accumulated a few hundreds of transactions/manifests in Flink checkpoint, this help avoiding rewrite of many manifest files. @rdblue can probably explain it better than I do.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r545703439



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -229,33 +232,71 @@ private void commitUpToCheckpoint(NavigableMap<Long, byte[]> manifestsMap,
     }
   }
 
-  private void replacePartitions(List<DataFile> dataFiles, String newFlinkJobId, long checkpointId) {
+  private void replacePartitions(NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId,
+                                 long checkpointId) {
+    // Merge all the pending results into a single write result.
+    WriteResult result = WriteResult.builder().add(pendingResults.values()).build();
+
+    // Partition overwrite does not support delete files.
+    Preconditions.checkArgument(result.deleteFiles().length == 0,
+        "Cannot overwrite partitions with delete files.");
     ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
 
+    // Commit the overwrite transaction.
     int numFiles = 0;
-    for (DataFile file : dataFiles) {
+    for (DataFile file : result.dataFiles()) {
       numFiles += 1;
       dynamicOverwrite.addFile(file);
     }
 
-    commitOperation(dynamicOverwrite, numFiles, "dynamic partition overwrite", newFlinkJobId, checkpointId);
+    commitOperation(dynamicOverwrite, numFiles, 0, "dynamic partition overwrite", newFlinkJobId, checkpointId);
   }
 
-  private void append(List<DataFile> dataFiles, String newFlinkJobId, long checkpointId) {
-    AppendFiles appendFiles = table.newAppend();
+  private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId, long checkpointId) {
+    // Merge all pending results into a single write result.
+    WriteResult mergedResult = WriteResult.builder().add(pendingResults.values()).build();
 
-    int numFiles = 0;
-    for (DataFile file : dataFiles) {
-      numFiles += 1;
-      appendFiles.appendFile(file);
-    }
+    if (mergedResult.deleteFiles().length < 1) {
+      // To be compatible with iceberg format V1.
+      AppendFiles appendFiles = table.newAppend();
+
+      int numFiles = 0;
+      for (DataFile file : mergedResult.dataFiles()) {
+        numFiles += 1;
+        appendFiles.appendFile(file);
+      }
+
+      commitOperation(appendFiles, numFiles, 0, "append", newFlinkJobId, checkpointId);
+    } else {
+      // To be compatible with iceberg format V2.
+      for (Map.Entry<Long, WriteResult> e : pendingResults.entrySet()) {
+        // We don't commit the merged result into a single transaction because for the sequential transaction txn1 and

Review comment:
       I've addressed this case in this unit test [here](https://github.com/apache/iceberg/pull/1939/commits/4e769b2c522c8ab7df7bf10eb267c758361e0a01#diff-2bbb61f336190e507c0cb57fc6d01f86b081cf150b460086b89b035d3b4917c8R702).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r544862996



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -85,9 +88,9 @@
   // iceberg table when the next checkpoint happen.
   private final NavigableMap<Long, byte[]> dataFilesPerCheckpoint = Maps.newTreeMap();
 
-  // The data files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the
+  // The completed files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the
   // 'dataFilesPerCheckpoint'.
-  private final List<DataFile> dataFilesOfCurrentCheckpoint = Lists.newArrayList();
+  private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList();

Review comment:
       Yes, it's correct here. Because if there're 5 `IcebergStreamWriter`, then each writer will emit a `WriteResult`.  For the one parallelism `IcebergFilesCommitter`,  it will collect all the `WriteResult`(s) in this `writeResultsOfCurrentCkpt` cache,  and then merge them into a single `WriteResult`.  Finally,  write those files into delete + data manifests and update the flink statebackend. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r544885658



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -184,36 +187,36 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
     }
   }
 
-  private void commitUpToCheckpoint(NavigableMap<Long, byte[]> manifestsMap,
+  private void commitUpToCheckpoint(NavigableMap<Long, byte[]> deltaManifestsMap,
                                     String newFlinkJobId,
                                     long checkpointId) throws IOException {
-    NavigableMap<Long, byte[]> pendingManifestMap = manifestsMap.headMap(checkpointId, true);
+    NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);
 
-    List<ManifestFile> manifestFiles = Lists.newArrayList();
-    List<DataFile> pendingDataFiles = Lists.newArrayList();
-    for (byte[] manifestData : pendingManifestMap.values()) {
-      if (Arrays.equals(EMPTY_MANIFEST_DATA, manifestData)) {
+    List<DeltaManifests> deltaManifestsList = Lists.newArrayList();
+    NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();
+    for (Map.Entry<Long, byte[]> e : pendingMap.entrySet()) {
+      if (Arrays.equals(EMPTY_MANIFEST_DATA, e.getValue())) {
         // Skip the empty flink manifest.
         continue;
       }
 
-      ManifestFile manifestFile =
-          SimpleVersionedSerialization.readVersionAndDeSerialize(FlinkManifestSerializer.INSTANCE, manifestData);
+      DeltaManifests deltaManifests =
+          SimpleVersionedSerialization.readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, e.getValue());

Review comment:
       We will need to maintain the flink state's compatibility.  If the encoding version is 1, then we should use the `FlinkManifestSerializer` way to read the `byte[]`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#issuecomment-748722385


   @stevenzwu @rdblue  Thanks for the reviewing and merging,  @JingsongLi  is currently busy for internal flink/blink development work, so he may not have the time to do the double-check now.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r545265016



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java
##########
@@ -19,30 +19,40 @@
 
 package org.apache.iceberg.flink.sink;
 
-import java.io.IOException;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
+import java.util.Iterator;
+import java.util.List;
 import org.apache.iceberg.ManifestFile;
-import org.apache.iceberg.ManifestFiles;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
-class FlinkManifestSerializer implements SimpleVersionedSerializer<ManifestFile> {
-  private static final int VERSION_NUM = 1;
-  static final FlinkManifestSerializer INSTANCE = new FlinkManifestSerializer();
+class DeltaManifests implements Iterable<ManifestFile> {
 
-  @Override
-  public int getVersion() {
-    return VERSION_NUM;
+  private final ManifestFile dataManifest;
+  private final ManifestFile deleteManifest;
+
+  DeltaManifests(ManifestFile dataManifest, ManifestFile deleteManifest) {
+    this.dataManifest = dataManifest;
+    this.deleteManifest = deleteManifest;
   }
 
-  @Override
-  public byte[] serialize(ManifestFile manifestFile) throws IOException {
-    Preconditions.checkNotNull(manifestFile, "ManifestFile to be serialized should not be null");
+  ManifestFile dataManifest() {
+    return dataManifest;
+  }
 
-    return ManifestFiles.encode(manifestFile);
+  ManifestFile deleteManifest() {
+    return deleteManifest;
   }
 
   @Override
-  public ManifestFile deserialize(int version, byte[] serialized) throws IOException {
-    return ManifestFiles.decode(serialized);
+  public Iterator<ManifestFile> iterator() {

Review comment:
       is this used?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r544863707



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java
##########
@@ -19,30 +19,42 @@
 
 package org.apache.iceberg.flink.sink;
 
-import java.io.IOException;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
+import java.util.Iterator;
+import java.util.List;
 import org.apache.iceberg.ManifestFile;
-import org.apache.iceberg.ManifestFiles;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.jetbrains.annotations.NotNull;

Review comment:
       Em,  this could be removed now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r545271812



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
##########
@@ -63,4 +66,53 @@ static ManifestOutputFileFactory createOutputFileFactory(Table table, String fli
     TableOperations ops = ((HasTableOperations) table).operations();
     return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, subTaskId, attemptNumber);
   }
+
+  static DeltaManifests writeCompletedFiles(WriteResult result,

Review comment:
       just for my own education, `referencedDataFiles` from `WriteResult` doesn't seem to be used (except for unit test). What is it for? do we need to serialize it too?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r545265016



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java
##########
@@ -19,30 +19,40 @@
 
 package org.apache.iceberg.flink.sink;
 
-import java.io.IOException;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
+import java.util.Iterator;
+import java.util.List;
 import org.apache.iceberg.ManifestFile;
-import org.apache.iceberg.ManifestFiles;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
-class FlinkManifestSerializer implements SimpleVersionedSerializer<ManifestFile> {
-  private static final int VERSION_NUM = 1;
-  static final FlinkManifestSerializer INSTANCE = new FlinkManifestSerializer();
+class DeltaManifests implements Iterable<ManifestFile> {
 
-  @Override
-  public int getVersion() {
-    return VERSION_NUM;
+  private final ManifestFile dataManifest;
+  private final ManifestFile deleteManifest;
+
+  DeltaManifests(ManifestFile dataManifest, ManifestFile deleteManifest) {
+    this.dataManifest = dataManifest;
+    this.deleteManifest = deleteManifest;
   }
 
-  @Override
-  public byte[] serialize(ManifestFile manifestFile) throws IOException {
-    Preconditions.checkNotNull(manifestFile, "ManifestFile to be serialized should not be null");
+  ManifestFile dataManifest() {
+    return dataManifest;
+  }
 
-    return ManifestFiles.encode(manifestFile);
+  ManifestFile deleteManifest() {
+    return deleteManifest;
   }
 
   @Override
-  public ManifestFile deserialize(int version, byte[] serialized) throws IOException {
-    return ManifestFiles.decode(serialized);
+  public Iterator<ManifestFile> iterator() {

Review comment:
       is this used? or does this need to extend from Iterable?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r545265016



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java
##########
@@ -19,30 +19,40 @@
 
 package org.apache.iceberg.flink.sink;
 
-import java.io.IOException;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
+import java.util.Iterator;
+import java.util.List;
 import org.apache.iceberg.ManifestFile;
-import org.apache.iceberg.ManifestFiles;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
-class FlinkManifestSerializer implements SimpleVersionedSerializer<ManifestFile> {
-  private static final int VERSION_NUM = 1;
-  static final FlinkManifestSerializer INSTANCE = new FlinkManifestSerializer();
+class DeltaManifests implements Iterable<ManifestFile> {
 
-  @Override
-  public int getVersion() {
-    return VERSION_NUM;
+  private final ManifestFile dataManifest;
+  private final ManifestFile deleteManifest;
+
+  DeltaManifests(ManifestFile dataManifest, ManifestFile deleteManifest) {
+    this.dataManifest = dataManifest;
+    this.deleteManifest = deleteManifest;
   }
 
-  @Override
-  public byte[] serialize(ManifestFile manifestFile) throws IOException {
-    Preconditions.checkNotNull(manifestFile, "ManifestFile to be serialized should not be null");
+  ManifestFile dataManifest() {
+    return dataManifest;
+  }
 
-    return ManifestFiles.encode(manifestFile);
+  ManifestFile deleteManifest() {
+    return deleteManifest;
   }
 
   @Override
-  public ManifestFile deserialize(int version, byte[] serialized) throws IOException {
-    return ManifestFiles.decode(serialized);
+  public Iterator<ManifestFile> iterator() {

Review comment:
       does this need to extend from Iterable? It seems only needed for using `Iterables.addAll(manifests, deltaManifests);`. is it simpler to directly to cal the two getters?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#issuecomment-747153858


   Looks good overall, thought I didn't look into the tests very thoroughly. Since this is getting into quite a bit of Flink logic, I'd appreciate it if @JingsongLi and @stevenzwu could also take a look and review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r545700153



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
##########
@@ -63,4 +66,53 @@ static ManifestOutputFileFactory createOutputFileFactory(Table table, String fli
     TableOperations ops = ((HasTableOperations) table).operations();
     return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, subTaskId, attemptNumber);
   }
+
+  static DeltaManifests writeCompletedFiles(WriteResult result,
+                                            Supplier<OutputFile> outputFileSupplier,
+                                            PartitionSpec spec) throws IOException {
+
+    ManifestFile dataManifest = null;
+    ManifestFile deleteManifest = null;
+
+    // Write the completed data files into a newly created data manifest file.
+    if (result.dataFiles() != null && result.dataFiles().length > 0) {
+      dataManifest = writeDataFiles(outputFileSupplier.get(), spec, Lists.newArrayList(result.dataFiles()));
+    }
+
+    // Write the completed delete files into a newly created delete manifest file.
+    if (result.deleteFiles() != null && result.deleteFiles().length > 0) {
+      OutputFile deleteManifestFile = outputFileSupplier.get();
+
+      ManifestWriter<DeleteFile> deleteManifestWriter = ManifestFiles.writeDeleteManifest(FORMAT_V2, spec,
+          deleteManifestFile, DUMMY_SNAPSHOT_ID);
+      try (ManifestWriter<DeleteFile> writer = deleteManifestWriter) {
+        for (DeleteFile deleteFile : result.deleteFiles()) {
+          writer.add(deleteFile);
+        }
+      }
+
+      deleteManifest = deleteManifestWriter.toManifestFile();
+    }
+
+    return new DeltaManifests(dataManifest, deleteManifest);

Review comment:
       We have a similar discussion [here](https://github.com/apache/iceberg/pull/1185#discussion_r474420170).  Even if the `WriteResult` is empty ( NOT null, null means there's nobody emitted a result to the `IcebergFilesCommitter`, while empty `WriteResult` means the `IcebergStreamWriter` did not write any new data but still emit a WriterResult with zero data files and zero delete files to downstream `IcebergFilesCommitter`),  we'd better to commit to iceberg txn so that the flink streaming job won't be failure easily when expiring a old snapshot (since that time we did not even write any new records).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r545715370



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -184,78 +185,106 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
     }
   }
 
-  private void commitUpToCheckpoint(NavigableMap<Long, byte[]> manifestsMap,
+  private void commitUpToCheckpoint(NavigableMap<Long, byte[]> deltaManifestsMap,
                                     String newFlinkJobId,
                                     long checkpointId) throws IOException {
-    NavigableMap<Long, byte[]> pendingManifestMap = manifestsMap.headMap(checkpointId, true);
+    NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);
 
-    List<ManifestFile> manifestFiles = Lists.newArrayList();
-    List<DataFile> pendingDataFiles = Lists.newArrayList();
-    for (byte[] manifestData : pendingManifestMap.values()) {
-      if (Arrays.equals(EMPTY_MANIFEST_DATA, manifestData)) {
+    List<ManifestFile> manifests = Lists.newArrayList();
+    NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();
+    for (Map.Entry<Long, byte[]> e : pendingMap.entrySet()) {
+      if (Arrays.equals(EMPTY_MANIFEST_DATA, e.getValue())) {
         // Skip the empty flink manifest.
         continue;
       }
 
-      ManifestFile manifestFile =
-          SimpleVersionedSerialization.readVersionAndDeSerialize(FlinkManifestSerializer.INSTANCE, manifestData);
-
-      manifestFiles.add(manifestFile);
-      pendingDataFiles.addAll(FlinkManifestUtil.readDataFiles(manifestFile, table.io()));
+      DeltaManifests deltaManifests = SimpleVersionedSerialization
+          .readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, e.getValue());
+      pendingResults.put(e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io()));
+      Iterables.addAll(manifests, deltaManifests);
     }
 
     if (replacePartitions) {
-      replacePartitions(pendingDataFiles, newFlinkJobId, checkpointId);
+      replacePartitions(pendingResults, newFlinkJobId, checkpointId);
     } else {
-      append(pendingDataFiles, newFlinkJobId, checkpointId);
+      commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId);
     }
 
-    pendingManifestMap.clear();
+    pendingMap.clear();
 
-    // Delete the committed manifests and clear the committed data files from dataFilesPerCheckpoint.
-    for (ManifestFile manifestFile : manifestFiles) {
+    // Delete the committed manifests.
+    for (ManifestFile manifest : manifests) {
       try {
-        table.io().deleteFile(manifestFile.path());
+        table.io().deleteFile(manifest.path());
       } catch (Exception e) {
         // The flink manifests cleaning failure shouldn't abort the completed checkpoint.
         String details = MoreObjects.toStringHelper(this)
             .add("flinkJobId", newFlinkJobId)
             .add("checkpointId", checkpointId)
-            .add("manifestPath", manifestFile.path())
+            .add("manifestPath", manifest.path())
             .toString();
         LOG.warn("The iceberg transaction has been committed, but we failed to clean the temporary flink manifests: {}",
             details, e);
       }
     }
   }
 
-  private void replacePartitions(List<DataFile> dataFiles, String newFlinkJobId, long checkpointId) {
+  private void replacePartitions(NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId,
+                                 long checkpointId) {
+    // Partition overwrite does not support delete files.
+    int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> r.deleteFiles().length).sum();
+    Preconditions.checkState(deleteFilesNum == 0, "Cannot overwrite partitions with delete files.");
+
+    // Commit the overwrite transaction.
     ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
 
     int numFiles = 0;
-    for (DataFile file : dataFiles) {
-      numFiles += 1;
-      dynamicOverwrite.addFile(file);
+    for (WriteResult result : pendingResults.values()) {
+      numFiles += result.dataFiles().length;
+      Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
     }
 
-    commitOperation(dynamicOverwrite, numFiles, "dynamic partition overwrite", newFlinkJobId, checkpointId);
+    commitOperation(dynamicOverwrite, numFiles, 0, "dynamic partition overwrite", newFlinkJobId, checkpointId);
   }
 
-  private void append(List<DataFile> dataFiles, String newFlinkJobId, long checkpointId) {
-    AppendFiles appendFiles = table.newAppend();
+  private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId, long checkpointId) {
+    int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> r.deleteFiles().length).sum();
 
-    int numFiles = 0;
-    for (DataFile file : dataFiles) {
-      numFiles += 1;
-      appendFiles.appendFile(file);
-    }
+    if (deleteFilesNum == 0) {
+      // To be compatible with iceberg format V1.
+      AppendFiles appendFiles = table.newAppend();
 
-    commitOperation(appendFiles, numFiles, "append", newFlinkJobId, checkpointId);
+      int numFiles = 0;
+      for (WriteResult result : pendingResults.values()) {

Review comment:
       It sounds like a separate improvement , so I created an issue for this , let's discuss there, https://github.com/apache/iceberg/issues/1959.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#issuecomment-748231753


   +1
   
   It would be great to have a review from @JingsongLi as well, but I'm going to go ahead and commit this since it looks good to @stevenzwu.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r545721548



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
##########
@@ -63,4 +66,53 @@ static ManifestOutputFileFactory createOutputFileFactory(Table table, String fli
     TableOperations ops = ((HasTableOperations) table).operations();
     return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, subTaskId, attemptNumber);
   }
+
+  static DeltaManifests writeCompletedFiles(WriteResult result,

Review comment:
       This [unit test](https://github.com/apache/iceberg/pull/1939/files#diff-2bbb61f336190e507c0cb57fc6d01f86b081cf150b460086b89b035d3b4917c8R658) addressed the data files validation issue 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r545701370



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
##########
@@ -63,4 +66,53 @@ static ManifestOutputFileFactory createOutputFileFactory(Table table, String fli
     TableOperations ops = ((HasTableOperations) table).operations();
     return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, subTaskId, attemptNumber);
   }
+
+  static DeltaManifests writeCompletedFiles(WriteResult result,
+                                            Supplier<OutputFile> outputFileSupplier,
+                                            PartitionSpec spec) throws IOException {
+
+    ManifestFile dataManifest = null;
+    ManifestFile deleteManifest = null;
+
+    // Write the completed data files into a newly created data manifest file.
+    if (result.dataFiles() != null && result.dataFiles().length > 0) {
+      dataManifest = writeDataFiles(outputFileSupplier.get(), spec, Lists.newArrayList(result.dataFiles()));
+    }
+
+    // Write the completed delete files into a newly created delete manifest file.
+    if (result.deleteFiles() != null && result.deleteFiles().length > 0) {
+      OutputFile deleteManifestFile = outputFileSupplier.get();
+
+      ManifestWriter<DeleteFile> deleteManifestWriter = ManifestFiles.writeDeleteManifest(FORMAT_V2, spec,
+          deleteManifestFile, DUMMY_SNAPSHOT_ID);
+      try (ManifestWriter<DeleteFile> writer = deleteManifestWriter) {
+        for (DeleteFile deleteFile : result.deleteFiles()) {
+          writer.add(deleteFile);
+        }
+      }
+
+      deleteManifest = deleteManifestWriter.toManifestFile();
+    }
+
+    return new DeltaManifests(dataManifest, deleteManifest);

Review comment:
       About this question,  I think we'd better to keep the dummy `DeltaManifests` in state , although it has no delete files and data files.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r545556133



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java
##########
@@ -19,30 +19,40 @@
 
 package org.apache.iceberg.flink.sink;
 
-import java.io.IOException;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
+import java.util.Iterator;
+import java.util.List;
 import org.apache.iceberg.ManifestFile;
-import org.apache.iceberg.ManifestFiles;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
-class FlinkManifestSerializer implements SimpleVersionedSerializer<ManifestFile> {
-  private static final int VERSION_NUM = 1;
-  static final FlinkManifestSerializer INSTANCE = new FlinkManifestSerializer();
+class DeltaManifests implements Iterable<ManifestFile> {
 
-  @Override
-  public int getVersion() {
-    return VERSION_NUM;
+  private final ManifestFile dataManifest;
+  private final ManifestFile deleteManifest;
+
+  DeltaManifests(ManifestFile dataManifest, ManifestFile deleteManifest) {
+    this.dataManifest = dataManifest;
+    this.deleteManifest = deleteManifest;
   }
 
-  @Override
-  public byte[] serialize(ManifestFile manifestFile) throws IOException {
-    Preconditions.checkNotNull(manifestFile, "ManifestFile to be serialized should not be null");
+  ManifestFile dataManifest() {
+    return dataManifest;
+  }
 
-    return ManifestFiles.encode(manifestFile);
+  ManifestFile deleteManifest() {
+    return deleteManifest;
   }
 
   @Override
-  public ManifestFile deserialize(int version, byte[] serialized) throws IOException {
-    return ManifestFiles.decode(serialized);
+  public Iterator<ManifestFile> iterator() {

Review comment:
       OK,  Agreed we don't have to introduce the complex `Iterable`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r545471359



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -184,78 +185,106 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
     }
   }
 
-  private void commitUpToCheckpoint(NavigableMap<Long, byte[]> manifestsMap,
+  private void commitUpToCheckpoint(NavigableMap<Long, byte[]> deltaManifestsMap,
                                     String newFlinkJobId,
                                     long checkpointId) throws IOException {
-    NavigableMap<Long, byte[]> pendingManifestMap = manifestsMap.headMap(checkpointId, true);
+    NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);
 
-    List<ManifestFile> manifestFiles = Lists.newArrayList();
-    List<DataFile> pendingDataFiles = Lists.newArrayList();
-    for (byte[] manifestData : pendingManifestMap.values()) {
-      if (Arrays.equals(EMPTY_MANIFEST_DATA, manifestData)) {
+    List<ManifestFile> manifests = Lists.newArrayList();
+    NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();
+    for (Map.Entry<Long, byte[]> e : pendingMap.entrySet()) {
+      if (Arrays.equals(EMPTY_MANIFEST_DATA, e.getValue())) {
         // Skip the empty flink manifest.
         continue;
       }
 
-      ManifestFile manifestFile =
-          SimpleVersionedSerialization.readVersionAndDeSerialize(FlinkManifestSerializer.INSTANCE, manifestData);
-
-      manifestFiles.add(manifestFile);
-      pendingDataFiles.addAll(FlinkManifestUtil.readDataFiles(manifestFile, table.io()));
+      DeltaManifests deltaManifests = SimpleVersionedSerialization
+          .readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, e.getValue());
+      pendingResults.put(e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io()));
+      Iterables.addAll(manifests, deltaManifests);
     }
 
     if (replacePartitions) {
-      replacePartitions(pendingDataFiles, newFlinkJobId, checkpointId);
+      replacePartitions(pendingResults, newFlinkJobId, checkpointId);
     } else {
-      append(pendingDataFiles, newFlinkJobId, checkpointId);
+      commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId);
     }
 
-    pendingManifestMap.clear();
+    pendingMap.clear();
 
-    // Delete the committed manifests and clear the committed data files from dataFilesPerCheckpoint.
-    for (ManifestFile manifestFile : manifestFiles) {
+    // Delete the committed manifests.
+    for (ManifestFile manifest : manifests) {
       try {
-        table.io().deleteFile(manifestFile.path());
+        table.io().deleteFile(manifest.path());
       } catch (Exception e) {
         // The flink manifests cleaning failure shouldn't abort the completed checkpoint.
         String details = MoreObjects.toStringHelper(this)
             .add("flinkJobId", newFlinkJobId)
             .add("checkpointId", checkpointId)
-            .add("manifestPath", manifestFile.path())
+            .add("manifestPath", manifest.path())
             .toString();
         LOG.warn("The iceberg transaction has been committed, but we failed to clean the temporary flink manifests: {}",
             details, e);
       }
     }
   }
 
-  private void replacePartitions(List<DataFile> dataFiles, String newFlinkJobId, long checkpointId) {
+  private void replacePartitions(NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId,
+                                 long checkpointId) {
+    // Partition overwrite does not support delete files.
+    int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> r.deleteFiles().length).sum();
+    Preconditions.checkState(deleteFilesNum == 0, "Cannot overwrite partitions with delete files.");
+
+    // Commit the overwrite transaction.
     ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
 
     int numFiles = 0;
-    for (DataFile file : dataFiles) {
-      numFiles += 1;
-      dynamicOverwrite.addFile(file);
+    for (WriteResult result : pendingResults.values()) {
+      numFiles += result.dataFiles().length;
+      Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
     }
 
-    commitOperation(dynamicOverwrite, numFiles, "dynamic partition overwrite", newFlinkJobId, checkpointId);
+    commitOperation(dynamicOverwrite, numFiles, 0, "dynamic partition overwrite", newFlinkJobId, checkpointId);
   }
 
-  private void append(List<DataFile> dataFiles, String newFlinkJobId, long checkpointId) {
-    AppendFiles appendFiles = table.newAppend();
+  private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId, long checkpointId) {
+    int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> r.deleteFiles().length).sum();
 
-    int numFiles = 0;
-    for (DataFile file : dataFiles) {
-      numFiles += 1;
-      appendFiles.appendFile(file);
-    }
+    if (deleteFilesNum == 0) {
+      // To be compatible with iceberg format V1.
+      AppendFiles appendFiles = table.newAppend();
 
-    commitOperation(appendFiles, numFiles, "append", newFlinkJobId, checkpointId);
+      int numFiles = 0;
+      for (WriteResult result : pendingResults.values()) {

Review comment:
       We are using this API from `AppendFiles` interface. When we had an extended outage and accumulated a few hundreds of transactions/manifests in Flink checkpoint, this help avoiding rewrite of those manifest files. Otherwise, commit can take very long. @rdblue can probably explain it better than I do.
   ```
     AppendFiles appendManifest(ManifestFile file);
   ```
   here we are merging data files potentially from multiple checkpoint cycles/manifests into a single manifest file. Maybe we can add a similar API in `DeleteFiles` interface?
   ```
     DeleteFiles deleteManifest(ManifestFile file);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r544749636



##########
File path: core/src/main/java/org/apache/iceberg/io/WriteResult.java
##########
@@ -76,6 +76,11 @@ public Builder add(WriteResult result) {
       return this;
     }
 
+    public Builder add(Iterable<WriteResult> results) {

Review comment:
       Typically, we would follow the Java collection convention and use `addAll`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r545269448



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
##########
@@ -63,4 +66,53 @@ static ManifestOutputFileFactory createOutputFileFactory(Table table, String fli
     TableOperations ops = ((HasTableOperations) table).operations();
     return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, subTaskId, attemptNumber);
   }
+
+  static DeltaManifests writeCompletedFiles(WriteResult result,
+                                            Supplier<OutputFile> outputFileSupplier,
+                                            PartitionSpec spec) throws IOException {
+
+    ManifestFile dataManifest = null;
+    ManifestFile deleteManifest = null;
+
+    // Write the completed data files into a newly created data manifest file.
+    if (result.dataFiles() != null && result.dataFiles().length > 0) {
+      dataManifest = writeDataFiles(outputFileSupplier.get(), spec, Lists.newArrayList(result.dataFiles()));
+    }
+
+    // Write the completed delete files into a newly created delete manifest file.
+    if (result.deleteFiles() != null && result.deleteFiles().length > 0) {
+      OutputFile deleteManifestFile = outputFileSupplier.get();
+
+      ManifestWriter<DeleteFile> deleteManifestWriter = ManifestFiles.writeDeleteManifest(FORMAT_V2, spec,
+          deleteManifestFile, DUMMY_SNAPSHOT_ID);
+      try (ManifestWriter<DeleteFile> writer = deleteManifestWriter) {
+        for (DeleteFile deleteFile : result.deleteFiles()) {
+          writer.add(deleteFile);
+        }
+      }
+
+      deleteManifest = deleteManifestWriter.toManifestFile();
+    }
+
+    return new DeltaManifests(dataManifest, deleteManifest);

Review comment:
       do we need to check if WriteResult is empty (no data and delete files)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r545994813



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -184,78 +185,106 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
     }
   }
 
-  private void commitUpToCheckpoint(NavigableMap<Long, byte[]> manifestsMap,
+  private void commitUpToCheckpoint(NavigableMap<Long, byte[]> deltaManifestsMap,
                                     String newFlinkJobId,
                                     long checkpointId) throws IOException {
-    NavigableMap<Long, byte[]> pendingManifestMap = manifestsMap.headMap(checkpointId, true);
+    NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);
 
-    List<ManifestFile> manifestFiles = Lists.newArrayList();
-    List<DataFile> pendingDataFiles = Lists.newArrayList();
-    for (byte[] manifestData : pendingManifestMap.values()) {
-      if (Arrays.equals(EMPTY_MANIFEST_DATA, manifestData)) {
+    List<ManifestFile> manifests = Lists.newArrayList();
+    NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();
+    for (Map.Entry<Long, byte[]> e : pendingMap.entrySet()) {
+      if (Arrays.equals(EMPTY_MANIFEST_DATA, e.getValue())) {
         // Skip the empty flink manifest.
         continue;
       }
 
-      ManifestFile manifestFile =
-          SimpleVersionedSerialization.readVersionAndDeSerialize(FlinkManifestSerializer.INSTANCE, manifestData);
-
-      manifestFiles.add(manifestFile);
-      pendingDataFiles.addAll(FlinkManifestUtil.readDataFiles(manifestFile, table.io()));
+      DeltaManifests deltaManifests = SimpleVersionedSerialization
+          .readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, e.getValue());
+      pendingResults.put(e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io()));
+      Iterables.addAll(manifests, deltaManifests);
     }
 
     if (replacePartitions) {
-      replacePartitions(pendingDataFiles, newFlinkJobId, checkpointId);
+      replacePartitions(pendingResults, newFlinkJobId, checkpointId);
     } else {
-      append(pendingDataFiles, newFlinkJobId, checkpointId);
+      commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId);
     }
 
-    pendingManifestMap.clear();
+    pendingMap.clear();
 
-    // Delete the committed manifests and clear the committed data files from dataFilesPerCheckpoint.
-    for (ManifestFile manifestFile : manifestFiles) {
+    // Delete the committed manifests.
+    for (ManifestFile manifest : manifests) {
       try {
-        table.io().deleteFile(manifestFile.path());
+        table.io().deleteFile(manifest.path());
       } catch (Exception e) {
         // The flink manifests cleaning failure shouldn't abort the completed checkpoint.
         String details = MoreObjects.toStringHelper(this)
             .add("flinkJobId", newFlinkJobId)
             .add("checkpointId", checkpointId)
-            .add("manifestPath", manifestFile.path())
+            .add("manifestPath", manifest.path())
             .toString();
         LOG.warn("The iceberg transaction has been committed, but we failed to clean the temporary flink manifests: {}",
             details, e);
       }
     }
   }
 
-  private void replacePartitions(List<DataFile> dataFiles, String newFlinkJobId, long checkpointId) {
+  private void replacePartitions(NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId,
+                                 long checkpointId) {
+    // Partition overwrite does not support delete files.
+    int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> r.deleteFiles().length).sum();
+    Preconditions.checkState(deleteFilesNum == 0, "Cannot overwrite partitions with delete files.");
+
+    // Commit the overwrite transaction.
     ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
 
     int numFiles = 0;
-    for (DataFile file : dataFiles) {
-      numFiles += 1;
-      dynamicOverwrite.addFile(file);
+    for (WriteResult result : pendingResults.values()) {
+      numFiles += result.dataFiles().length;
+      Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
     }
 
-    commitOperation(dynamicOverwrite, numFiles, "dynamic partition overwrite", newFlinkJobId, checkpointId);
+    commitOperation(dynamicOverwrite, numFiles, 0, "dynamic partition overwrite", newFlinkJobId, checkpointId);
   }
 
-  private void append(List<DataFile> dataFiles, String newFlinkJobId, long checkpointId) {
-    AppendFiles appendFiles = table.newAppend();
+  private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId, long checkpointId) {
+    int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> r.deleteFiles().length).sum();
 
-    int numFiles = 0;
-    for (DataFile file : dataFiles) {
-      numFiles += 1;
-      appendFiles.appendFile(file);
-    }
+    if (deleteFilesNum == 0) {
+      // To be compatible with iceberg format V1.
+      AppendFiles appendFiles = table.newAppend();
 
-    commitOperation(appendFiles, numFiles, "append", newFlinkJobId, checkpointId);
+      int numFiles = 0;
+      for (WriteResult result : pendingResults.values()) {

Review comment:
       > Maybe we can add a similar API in DeleteFiles interface?
   
   We don't currently do this because we need delete entries to exist when we delete files. That way we can track when something was deleted and clean it up incrementally in `ExpireSnapshots`. If we did have a method like this, it would always rewrite the manifest with deletes, or would need to ensure that the manifest that is added contains only deletes, and these requirements are not very obvious. I think it is better to pass the deleted files through the existing methods.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r544751813



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -85,9 +88,9 @@
   // iceberg table when the next checkpoint happen.
   private final NavigableMap<Long, byte[]> dataFilesPerCheckpoint = Maps.newTreeMap();
 
-  // The data files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the
+  // The completed files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the
   // 'dataFilesPerCheckpoint'.
-  private final List<DataFile> dataFilesOfCurrentCheckpoint = Lists.newArrayList();
+  private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList();

Review comment:
       Is it correct for this to be a list of write results if a write result keeps track of a list of data files and a list of delete files?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#issuecomment-748002552


   @stevenzwu  Thanks for your reviewing,  I addressed all the things except the separate issue #1959.  any other concerns ? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r544859737



##########
File path: core/src/main/java/org/apache/iceberg/io/WriteResult.java
##########
@@ -76,6 +76,11 @@ public Builder add(WriteResult result) {
       return this;
     }
 
+    public Builder add(Iterable<WriteResult> results) {

Review comment:
       OK,  rename it to `addAll` sound great to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r544343472



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -229,33 +232,71 @@ private void commitUpToCheckpoint(NavigableMap<Long, byte[]> manifestsMap,
     }
   }
 
-  private void replacePartitions(List<DataFile> dataFiles, String newFlinkJobId, long checkpointId) {
+  private void replacePartitions(NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId,
+                                 long checkpointId) {
+    // Merge all the pending results into a single write result.
+    WriteResult result = WriteResult.builder().add(pendingResults.values()).build();
+
+    // Partition overwrite does not support delete files.
+    Preconditions.checkArgument(result.deleteFiles().length == 0,
+        "Cannot overwrite partitions with delete files.");
     ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
 
+    // Commit the overwrite transaction.
     int numFiles = 0;
-    for (DataFile file : dataFiles) {
+    for (DataFile file : result.dataFiles()) {
       numFiles += 1;
       dynamicOverwrite.addFile(file);
     }
 
-    commitOperation(dynamicOverwrite, numFiles, "dynamic partition overwrite", newFlinkJobId, checkpointId);
+    commitOperation(dynamicOverwrite, numFiles, 0, "dynamic partition overwrite", newFlinkJobId, checkpointId);
   }
 
-  private void append(List<DataFile> dataFiles, String newFlinkJobId, long checkpointId) {
-    AppendFiles appendFiles = table.newAppend();
+  private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId, long checkpointId) {
+    // Merge all pending results into a single write result.
+    WriteResult mergedResult = WriteResult.builder().add(pendingResults.values()).build();
 
-    int numFiles = 0;
-    for (DataFile file : dataFiles) {
-      numFiles += 1;
-      appendFiles.appendFile(file);
-    }
+    if (mergedResult.deleteFiles().length < 1) {
+      // To be compatible with iceberg format V1.
+      AppendFiles appendFiles = table.newAppend();
+
+      int numFiles = 0;
+      for (DataFile file : mergedResult.dataFiles()) {
+        numFiles += 1;
+        appendFiles.appendFile(file);
+      }
+
+      commitOperation(appendFiles, numFiles, 0, "append", newFlinkJobId, checkpointId);
+    } else {
+      // To be compatible with iceberg format V2.
+      for (Map.Entry<Long, WriteResult> e : pendingResults.entrySet()) {
+        // We don't commit the merged result into a single transaction because for the sequential transaction txn1 and

Review comment:
       I will provide an unit test to address it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r545291361



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
##########
@@ -63,4 +66,53 @@ static ManifestOutputFileFactory createOutputFileFactory(Table table, String fli
     TableOperations ops = ((HasTableOperations) table).operations();
     return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, subTaskId, attemptNumber);
   }
+
+  static DeltaManifests writeCompletedFiles(WriteResult result,

Review comment:
       We should serialize it and add it to the commit. This is the set of files that is referenced by any positional delete, which identifies deleted rows by file and row position. The commit will validate that all of the files still exist in the table.
   
   This isn't strictly needed for this use case because we know that the position deletes only refer to files that are created in this commit. Since the files are being added in the commit, it isn't possible for some other process to delete some of them from metadata. But it is still good to configure the commit properly in case this gets reused later.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1939: Flink: Commit both data files and delete files to iceberg transaction.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r544749042



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java
##########
@@ -19,30 +19,42 @@
 
 package org.apache.iceberg.flink.sink;
 
-import java.io.IOException;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
+import java.util.Iterator;
+import java.util.List;
 import org.apache.iceberg.ManifestFile;
-import org.apache.iceberg.ManifestFiles;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.jetbrains.annotations.NotNull;

Review comment:
       Is this used?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org