You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/18 09:46:22 UTC

[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #224: [FLINK-28582] Check LSM tree structure when multiple jobs are committing into the same bucket

JingsongLi commented on code in PR #224:
URL: https://github.com/apache/flink-table-store/pull/224#discussion_r923145922


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java:
##########
@@ -121,6 +130,12 @@ public FileStoreCommit withLock(Lock lock) {
         return this;
     }
 
+    @Override
+    public FileStoreCommit withCommitEmptyNewFiles(boolean commitEmptyNewFiles) {

Review Comment:
   `withCreateEmptyCommit`?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java:
##########
@@ -464,50 +479,88 @@ private boolean tryCommitOnce(
     }
 
     private void noConflictsOrFail(long snapshotId, List<ManifestEntry> changes) {
-        Set<ManifestEntry.Identifier> removedFiles =
-                changes.stream()
-                        .filter(e -> e.kind().equals(FileKind.DELETE))
-                        .map(ManifestEntry::identifier)
-                        .collect(Collectors.toSet());
-        if (removedFiles.isEmpty()) {
-            // early exit for append only changes
-            return;
-        }
-
         List<BinaryRowData> changedPartitions =
                 changes.stream()
                         .map(ManifestEntry::partition)
                         .distinct()
                         .collect(Collectors.toList());
+        List<ManifestEntry> currentEntries;
         try {
-            for (ManifestEntry entry :
+            currentEntries =
                     scan.withSnapshot(snapshotId)
                             .withPartitionFilter(changedPartitions)
                             .plan()
-                            .files()) {
-                removedFiles.remove(entry.identifier());
-            }
+                            .files();
         } catch (Throwable e) {
             throw new RuntimeException("Cannot determine if conflicts exist.", e);
         }
 
+        noConflictsForDeletedFilesOrFail(currentEntries, changes);
+        noConflictsForLsmOrFail(currentEntries, changes);
+    }
+
+    private void noConflictsForDeletedFilesOrFail(
+            List<ManifestEntry> currentEntries, List<ManifestEntry> changes) {
+        Set<ManifestEntry.Identifier> removedFiles =
+                changes.stream()
+                        .filter(e -> e.kind().equals(FileKind.DELETE))
+                        .map(ManifestEntry::identifier)
+                        .collect(Collectors.toSet());
+        if (removedFiles.isEmpty()) {
+            // early exit for append only changes
+            return;
+        }
+
+        // check that files to be removed are not yet removed
+        for (ManifestEntry entry : currentEntries) {
+            removedFiles.remove(entry.identifier());
+        }
         if (!removedFiles.isEmpty()) {
             throw new RuntimeException(
-                    "Conflicts detected on:\n"
+                    "File deletion conflicts detected! Give up committing compact changes. Conflict files are:\n"
                             + removedFiles.stream()
-                                    .map(
-                                            i ->
-                                                    pathFactory.getPartitionString(i.partition)
-                                                            + ", bucket "
-                                                            + i.bucket
-                                                            + ", level "
-                                                            + i.level
-                                                            + ", file "
-                                                            + i.fileName)
+                                    .map(i -> i.toString(pathFactory))
                                     .collect(Collectors.joining("\n")));
         }
     }
 
+    private void noConflictsForLsmOrFail(
+            List<ManifestEntry> currentEntries, List<ManifestEntry> changes) {
+        if (keyComparator == null) {
+            return;
+        }
+
+        List<ManifestEntry> allEntries = new ArrayList<>();
+        allEntries.addAll(currentEntries);
+        allEntries.addAll(changes);
+        Map<LevelIdentifier, List<ManifestEntry>> levels = new HashMap<>();
+        for (ManifestEntry entry : ManifestEntry.mergeManifestEntries(allEntries)) {
+            int level = entry.file().level();
+            if (level >= 1) {
+                levels.compute(

Review Comment:
   `computeIfAbsent`?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java:
##########
@@ -464,50 +479,88 @@ private boolean tryCommitOnce(
     }
 
     private void noConflictsOrFail(long snapshotId, List<ManifestEntry> changes) {
-        Set<ManifestEntry.Identifier> removedFiles =
-                changes.stream()
-                        .filter(e -> e.kind().equals(FileKind.DELETE))
-                        .map(ManifestEntry::identifier)
-                        .collect(Collectors.toSet());
-        if (removedFiles.isEmpty()) {
-            // early exit for append only changes
-            return;
-        }
-
         List<BinaryRowData> changedPartitions =
                 changes.stream()
                         .map(ManifestEntry::partition)
                         .distinct()
                         .collect(Collectors.toList());
+        List<ManifestEntry> currentEntries;
         try {
-            for (ManifestEntry entry :
+            currentEntries =
                     scan.withSnapshot(snapshotId)
                             .withPartitionFilter(changedPartitions)
                             .plan()
-                            .files()) {
-                removedFiles.remove(entry.identifier());
-            }
+                            .files();
         } catch (Throwable e) {
             throw new RuntimeException("Cannot determine if conflicts exist.", e);
         }
 
+        noConflictsForDeletedFilesOrFail(currentEntries, changes);
+        noConflictsForLsmOrFail(currentEntries, changes);
+    }
+
+    private void noConflictsForDeletedFilesOrFail(

Review Comment:
   Do we need this method? `mergeManifestEntries` will throw exception?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java:
##########
@@ -464,50 +479,88 @@ private boolean tryCommitOnce(
     }
 
     private void noConflictsOrFail(long snapshotId, List<ManifestEntry> changes) {
-        Set<ManifestEntry.Identifier> removedFiles =
-                changes.stream()
-                        .filter(e -> e.kind().equals(FileKind.DELETE))
-                        .map(ManifestEntry::identifier)
-                        .collect(Collectors.toSet());
-        if (removedFiles.isEmpty()) {
-            // early exit for append only changes
-            return;
-        }
-
         List<BinaryRowData> changedPartitions =
                 changes.stream()
                         .map(ManifestEntry::partition)
                         .distinct()
                         .collect(Collectors.toList());
+        List<ManifestEntry> currentEntries;
         try {
-            for (ManifestEntry entry :
+            currentEntries =
                     scan.withSnapshot(snapshotId)
                             .withPartitionFilter(changedPartitions)
                             .plan()
-                            .files()) {
-                removedFiles.remove(entry.identifier());
-            }
+                            .files();
         } catch (Throwable e) {
             throw new RuntimeException("Cannot determine if conflicts exist.", e);
         }
 
+        noConflictsForDeletedFilesOrFail(currentEntries, changes);
+        noConflictsForLsmOrFail(currentEntries, changes);
+    }
+
+    private void noConflictsForDeletedFilesOrFail(
+            List<ManifestEntry> currentEntries, List<ManifestEntry> changes) {
+        Set<ManifestEntry.Identifier> removedFiles =
+                changes.stream()
+                        .filter(e -> e.kind().equals(FileKind.DELETE))
+                        .map(ManifestEntry::identifier)
+                        .collect(Collectors.toSet());
+        if (removedFiles.isEmpty()) {
+            // early exit for append only changes
+            return;
+        }
+
+        // check that files to be removed are not yet removed
+        for (ManifestEntry entry : currentEntries) {
+            removedFiles.remove(entry.identifier());
+        }
         if (!removedFiles.isEmpty()) {
             throw new RuntimeException(
-                    "Conflicts detected on:\n"
+                    "File deletion conflicts detected! Give up committing compact changes. Conflict files are:\n"
                             + removedFiles.stream()
-                                    .map(
-                                            i ->
-                                                    pathFactory.getPartitionString(i.partition)
-                                                            + ", bucket "
-                                                            + i.bucket
-                                                            + ", level "
-                                                            + i.level
-                                                            + ", file "
-                                                            + i.fileName)
+                                    .map(i -> i.toString(pathFactory))
                                     .collect(Collectors.joining("\n")));
         }
     }
 
+    private void noConflictsForLsmOrFail(
+            List<ManifestEntry> currentEntries, List<ManifestEntry> changes) {
+        if (keyComparator == null) {
+            return;
+        }
+
+        List<ManifestEntry> allEntries = new ArrayList<>();
+        allEntries.addAll(currentEntries);
+        allEntries.addAll(changes);
+        Map<LevelIdentifier, List<ManifestEntry>> levels = new HashMap<>();
+        for (ManifestEntry entry : ManifestEntry.mergeManifestEntries(allEntries)) {
+            int level = entry.file().level();
+            if (level >= 1) {
+                levels.compute(
+                                new LevelIdentifier(entry.partition(), entry.bucket(), level),
+                                (lv, list) -> list == null ? new ArrayList<>() : list)
+                        .add(entry);
+            }
+        }
+
+        // check for all LSM level >= 1, key ranges of files do not intersect
+        for (List<ManifestEntry> entries : levels.values()) {
+            entries.sort((a, b) -> keyComparator.compare(a.file().minKey(), b.file().minKey()));
+            for (int i = 0; i + 1 < entries.size(); i++) {
+                ManifestEntry a = entries.get(i);
+                ManifestEntry b = entries.get(i + 1);
+                if (keyComparator.compare(a.file().maxKey(), b.file().minKey()) >= 0) {

Review Comment:
   `>` instead of `>=`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org