You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/18 10:20:01 UTC

[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #224: [FLINK-28582] Check LSM tree structure when multiple jobs are committing into the same bucket

tsreaper commented on code in PR #224:
URL: https://github.com/apache/flink-table-store/pull/224#discussion_r923186715


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java:
##########
@@ -464,50 +479,88 @@ private boolean tryCommitOnce(
     }
 
     private void noConflictsOrFail(long snapshotId, List<ManifestEntry> changes) {
-        Set<ManifestEntry.Identifier> removedFiles =
-                changes.stream()
-                        .filter(e -> e.kind().equals(FileKind.DELETE))
-                        .map(ManifestEntry::identifier)
-                        .collect(Collectors.toSet());
-        if (removedFiles.isEmpty()) {
-            // early exit for append only changes
-            return;
-        }
-
         List<BinaryRowData> changedPartitions =
                 changes.stream()
                         .map(ManifestEntry::partition)
                         .distinct()
                         .collect(Collectors.toList());
+        List<ManifestEntry> currentEntries;
         try {
-            for (ManifestEntry entry :
+            currentEntries =
                     scan.withSnapshot(snapshotId)
                             .withPartitionFilter(changedPartitions)
                             .plan()
-                            .files()) {
-                removedFiles.remove(entry.identifier());
-            }
+                            .files();
         } catch (Throwable e) {
             throw new RuntimeException("Cannot determine if conflicts exist.", e);
         }
 
+        noConflictsForDeletedFilesOrFail(currentEntries, changes);
+        noConflictsForLsmOrFail(currentEntries, changes);
+    }
+
+    private void noConflictsForDeletedFilesOrFail(

Review Comment:
   This method will throw a more specific exception message. If we just use `mergeManifestEntries` the exception will say "manifest files are corrupted" which is actually not the case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org