You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/18 07:23:28 UTC

[GitHub] [flink-table-store] tsreaper opened a new pull request, #224: [FLINK-28582] Check LSM tree structure when multiple jobs are committing into the same bucket

tsreaper opened a new pull request, #224:
URL: https://github.com/apache/flink-table-store/pull/224

   Currently `FileStoreCommitImpl` only checks for conflicts by checking the files we're going to delete (due to compaction) are still there.
   
   However, consider two jobs committing into the same LSM tree at the same time. For their first compaction no conflict is detected because they'll only delete their own level 0 files. But they will both produce level 1 files and the key ranges of these files may overlap. This is not correct for our LSM tree structure.
   
   This PR fixes this issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #224: [FLINK-28582] Check LSM tree structure when multiple jobs are committing into the same bucket

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #224:
URL: https://github.com/apache/flink-table-store/pull/224#discussion_r923186715


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java:
##########
@@ -464,50 +479,88 @@ private boolean tryCommitOnce(
     }
 
     private void noConflictsOrFail(long snapshotId, List<ManifestEntry> changes) {
-        Set<ManifestEntry.Identifier> removedFiles =
-                changes.stream()
-                        .filter(e -> e.kind().equals(FileKind.DELETE))
-                        .map(ManifestEntry::identifier)
-                        .collect(Collectors.toSet());
-        if (removedFiles.isEmpty()) {
-            // early exit for append only changes
-            return;
-        }
-
         List<BinaryRowData> changedPartitions =
                 changes.stream()
                         .map(ManifestEntry::partition)
                         .distinct()
                         .collect(Collectors.toList());
+        List<ManifestEntry> currentEntries;
         try {
-            for (ManifestEntry entry :
+            currentEntries =
                     scan.withSnapshot(snapshotId)
                             .withPartitionFilter(changedPartitions)
                             .plan()
-                            .files()) {
-                removedFiles.remove(entry.identifier());
-            }
+                            .files();
         } catch (Throwable e) {
             throw new RuntimeException("Cannot determine if conflicts exist.", e);
         }
 
+        noConflictsForDeletedFilesOrFail(currentEntries, changes);
+        noConflictsForLsmOrFail(currentEntries, changes);
+    }
+
+    private void noConflictsForDeletedFilesOrFail(

Review Comment:
   This method will throw a more specific exception message. If we just use `mergeManifestEntries` the exception will say "manifest files are corrupted" which is actually not the case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi merged pull request #224: [FLINK-28582] Check LSM tree structure when multiple jobs are committing into the same bucket

Posted by GitBox <gi...@apache.org>.
JingsongLi merged PR #224:
URL: https://github.com/apache/flink-table-store/pull/224


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #224: [FLINK-28582] Check LSM tree structure when multiple jobs are committing into the same bucket

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #224:
URL: https://github.com/apache/flink-table-store/pull/224#discussion_r923192366


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java:
##########
@@ -464,50 +479,88 @@ private boolean tryCommitOnce(
     }
 
     private void noConflictsOrFail(long snapshotId, List<ManifestEntry> changes) {
-        Set<ManifestEntry.Identifier> removedFiles =
-                changes.stream()
-                        .filter(e -> e.kind().equals(FileKind.DELETE))
-                        .map(ManifestEntry::identifier)
-                        .collect(Collectors.toSet());
-        if (removedFiles.isEmpty()) {
-            // early exit for append only changes
-            return;
-        }
-
         List<BinaryRowData> changedPartitions =
                 changes.stream()
                         .map(ManifestEntry::partition)
                         .distinct()
                         .collect(Collectors.toList());
+        List<ManifestEntry> currentEntries;
         try {
-            for (ManifestEntry entry :
+            currentEntries =
                     scan.withSnapshot(snapshotId)
                             .withPartitionFilter(changedPartitions)
                             .plan()
-                            .files()) {
-                removedFiles.remove(entry.identifier());
-            }
+                            .files();
         } catch (Throwable e) {
             throw new RuntimeException("Cannot determine if conflicts exist.", e);
         }
 
+        noConflictsForDeletedFilesOrFail(currentEntries, changes);
+        noConflictsForLsmOrFail(currentEntries, changes);
+    }
+
+    private void noConflictsForDeletedFilesOrFail(

Review Comment:
   Can we improve `mergeManifestEntries` method? Maybe pass a exception `Supplier`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #224: [FLINK-28582] Check LSM tree structure when multiple jobs are committing into the same bucket

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #224:
URL: https://github.com/apache/flink-table-store/pull/224#discussion_r924098371


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommit.java:
##########
@@ -29,6 +29,8 @@ public interface FileStoreCommit {
     /** With global lock. */
     FileStoreCommit withLock(Lock lock);
 
+    FileStoreCommit withCreateEmptyCommit(boolean commitEmptyNewFiles);

Review Comment:
   `commitEmptyNewFiles` -> `createEmptyCommit`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #224: [FLINK-28582] Check LSM tree structure when multiple jobs are committing into the same bucket

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #224:
URL: https://github.com/apache/flink-table-store/pull/224#discussion_r923145922


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java:
##########
@@ -121,6 +130,12 @@ public FileStoreCommit withLock(Lock lock) {
         return this;
     }
 
+    @Override
+    public FileStoreCommit withCommitEmptyNewFiles(boolean commitEmptyNewFiles) {

Review Comment:
   `withCreateEmptyCommit`?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java:
##########
@@ -464,50 +479,88 @@ private boolean tryCommitOnce(
     }
 
     private void noConflictsOrFail(long snapshotId, List<ManifestEntry> changes) {
-        Set<ManifestEntry.Identifier> removedFiles =
-                changes.stream()
-                        .filter(e -> e.kind().equals(FileKind.DELETE))
-                        .map(ManifestEntry::identifier)
-                        .collect(Collectors.toSet());
-        if (removedFiles.isEmpty()) {
-            // early exit for append only changes
-            return;
-        }
-
         List<BinaryRowData> changedPartitions =
                 changes.stream()
                         .map(ManifestEntry::partition)
                         .distinct()
                         .collect(Collectors.toList());
+        List<ManifestEntry> currentEntries;
         try {
-            for (ManifestEntry entry :
+            currentEntries =
                     scan.withSnapshot(snapshotId)
                             .withPartitionFilter(changedPartitions)
                             .plan()
-                            .files()) {
-                removedFiles.remove(entry.identifier());
-            }
+                            .files();
         } catch (Throwable e) {
             throw new RuntimeException("Cannot determine if conflicts exist.", e);
         }
 
+        noConflictsForDeletedFilesOrFail(currentEntries, changes);
+        noConflictsForLsmOrFail(currentEntries, changes);
+    }
+
+    private void noConflictsForDeletedFilesOrFail(
+            List<ManifestEntry> currentEntries, List<ManifestEntry> changes) {
+        Set<ManifestEntry.Identifier> removedFiles =
+                changes.stream()
+                        .filter(e -> e.kind().equals(FileKind.DELETE))
+                        .map(ManifestEntry::identifier)
+                        .collect(Collectors.toSet());
+        if (removedFiles.isEmpty()) {
+            // early exit for append only changes
+            return;
+        }
+
+        // check that files to be removed are not yet removed
+        for (ManifestEntry entry : currentEntries) {
+            removedFiles.remove(entry.identifier());
+        }
         if (!removedFiles.isEmpty()) {
             throw new RuntimeException(
-                    "Conflicts detected on:\n"
+                    "File deletion conflicts detected! Give up committing compact changes. Conflict files are:\n"
                             + removedFiles.stream()
-                                    .map(
-                                            i ->
-                                                    pathFactory.getPartitionString(i.partition)
-                                                            + ", bucket "
-                                                            + i.bucket
-                                                            + ", level "
-                                                            + i.level
-                                                            + ", file "
-                                                            + i.fileName)
+                                    .map(i -> i.toString(pathFactory))
                                     .collect(Collectors.joining("\n")));
         }
     }
 
+    private void noConflictsForLsmOrFail(
+            List<ManifestEntry> currentEntries, List<ManifestEntry> changes) {
+        if (keyComparator == null) {
+            return;
+        }
+
+        List<ManifestEntry> allEntries = new ArrayList<>();
+        allEntries.addAll(currentEntries);
+        allEntries.addAll(changes);
+        Map<LevelIdentifier, List<ManifestEntry>> levels = new HashMap<>();
+        for (ManifestEntry entry : ManifestEntry.mergeManifestEntries(allEntries)) {
+            int level = entry.file().level();
+            if (level >= 1) {
+                levels.compute(

Review Comment:
   `computeIfAbsent`?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java:
##########
@@ -464,50 +479,88 @@ private boolean tryCommitOnce(
     }
 
     private void noConflictsOrFail(long snapshotId, List<ManifestEntry> changes) {
-        Set<ManifestEntry.Identifier> removedFiles =
-                changes.stream()
-                        .filter(e -> e.kind().equals(FileKind.DELETE))
-                        .map(ManifestEntry::identifier)
-                        .collect(Collectors.toSet());
-        if (removedFiles.isEmpty()) {
-            // early exit for append only changes
-            return;
-        }
-
         List<BinaryRowData> changedPartitions =
                 changes.stream()
                         .map(ManifestEntry::partition)
                         .distinct()
                         .collect(Collectors.toList());
+        List<ManifestEntry> currentEntries;
         try {
-            for (ManifestEntry entry :
+            currentEntries =
                     scan.withSnapshot(snapshotId)
                             .withPartitionFilter(changedPartitions)
                             .plan()
-                            .files()) {
-                removedFiles.remove(entry.identifier());
-            }
+                            .files();
         } catch (Throwable e) {
             throw new RuntimeException("Cannot determine if conflicts exist.", e);
         }
 
+        noConflictsForDeletedFilesOrFail(currentEntries, changes);
+        noConflictsForLsmOrFail(currentEntries, changes);
+    }
+
+    private void noConflictsForDeletedFilesOrFail(

Review Comment:
   Do we need this method? `mergeManifestEntries` will throw exception?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java:
##########
@@ -464,50 +479,88 @@ private boolean tryCommitOnce(
     }
 
     private void noConflictsOrFail(long snapshotId, List<ManifestEntry> changes) {
-        Set<ManifestEntry.Identifier> removedFiles =
-                changes.stream()
-                        .filter(e -> e.kind().equals(FileKind.DELETE))
-                        .map(ManifestEntry::identifier)
-                        .collect(Collectors.toSet());
-        if (removedFiles.isEmpty()) {
-            // early exit for append only changes
-            return;
-        }
-
         List<BinaryRowData> changedPartitions =
                 changes.stream()
                         .map(ManifestEntry::partition)
                         .distinct()
                         .collect(Collectors.toList());
+        List<ManifestEntry> currentEntries;
         try {
-            for (ManifestEntry entry :
+            currentEntries =
                     scan.withSnapshot(snapshotId)
                             .withPartitionFilter(changedPartitions)
                             .plan()
-                            .files()) {
-                removedFiles.remove(entry.identifier());
-            }
+                            .files();
         } catch (Throwable e) {
             throw new RuntimeException("Cannot determine if conflicts exist.", e);
         }
 
+        noConflictsForDeletedFilesOrFail(currentEntries, changes);
+        noConflictsForLsmOrFail(currentEntries, changes);
+    }
+
+    private void noConflictsForDeletedFilesOrFail(
+            List<ManifestEntry> currentEntries, List<ManifestEntry> changes) {
+        Set<ManifestEntry.Identifier> removedFiles =
+                changes.stream()
+                        .filter(e -> e.kind().equals(FileKind.DELETE))
+                        .map(ManifestEntry::identifier)
+                        .collect(Collectors.toSet());
+        if (removedFiles.isEmpty()) {
+            // early exit for append only changes
+            return;
+        }
+
+        // check that files to be removed are not yet removed
+        for (ManifestEntry entry : currentEntries) {
+            removedFiles.remove(entry.identifier());
+        }
         if (!removedFiles.isEmpty()) {
             throw new RuntimeException(
-                    "Conflicts detected on:\n"
+                    "File deletion conflicts detected! Give up committing compact changes. Conflict files are:\n"
                             + removedFiles.stream()
-                                    .map(
-                                            i ->
-                                                    pathFactory.getPartitionString(i.partition)
-                                                            + ", bucket "
-                                                            + i.bucket
-                                                            + ", level "
-                                                            + i.level
-                                                            + ", file "
-                                                            + i.fileName)
+                                    .map(i -> i.toString(pathFactory))
                                     .collect(Collectors.joining("\n")));
         }
     }
 
+    private void noConflictsForLsmOrFail(
+            List<ManifestEntry> currentEntries, List<ManifestEntry> changes) {
+        if (keyComparator == null) {
+            return;
+        }
+
+        List<ManifestEntry> allEntries = new ArrayList<>();
+        allEntries.addAll(currentEntries);
+        allEntries.addAll(changes);
+        Map<LevelIdentifier, List<ManifestEntry>> levels = new HashMap<>();
+        for (ManifestEntry entry : ManifestEntry.mergeManifestEntries(allEntries)) {
+            int level = entry.file().level();
+            if (level >= 1) {
+                levels.compute(
+                                new LevelIdentifier(entry.partition(), entry.bucket(), level),
+                                (lv, list) -> list == null ? new ArrayList<>() : list)
+                        .add(entry);
+            }
+        }
+
+        // check for all LSM level >= 1, key ranges of files do not intersect
+        for (List<ManifestEntry> entries : levels.values()) {
+            entries.sort((a, b) -> keyComparator.compare(a.file().minKey(), b.file().minKey()));
+            for (int i = 0; i + 1 < entries.size(); i++) {
+                ManifestEntry a = entries.get(i);
+                ManifestEntry b = entries.get(i + 1);
+                if (keyComparator.compare(a.file().maxKey(), b.file().minKey()) >= 0) {

Review Comment:
   `>` instead of `>=`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #224: [FLINK-28582] Check LSM tree structure when multiple jobs are committing into the same bucket

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #224:
URL: https://github.com/apache/flink-table-store/pull/224#discussion_r923160171


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java:
##########
@@ -464,50 +479,88 @@ private boolean tryCommitOnce(
     }
 
     private void noConflictsOrFail(long snapshotId, List<ManifestEntry> changes) {
-        Set<ManifestEntry.Identifier> removedFiles =
-                changes.stream()
-                        .filter(e -> e.kind().equals(FileKind.DELETE))
-                        .map(ManifestEntry::identifier)
-                        .collect(Collectors.toSet());
-        if (removedFiles.isEmpty()) {
-            // early exit for append only changes
-            return;
-        }
-
         List<BinaryRowData> changedPartitions =
                 changes.stream()
                         .map(ManifestEntry::partition)
                         .distinct()
                         .collect(Collectors.toList());
+        List<ManifestEntry> currentEntries;
         try {
-            for (ManifestEntry entry :
+            currentEntries =
                     scan.withSnapshot(snapshotId)
                             .withPartitionFilter(changedPartitions)
                             .plan()
-                            .files()) {
-                removedFiles.remove(entry.identifier());
-            }
+                            .files();
         } catch (Throwable e) {
             throw new RuntimeException("Cannot determine if conflicts exist.", e);
         }
 
+        noConflictsForDeletedFilesOrFail(currentEntries, changes);
+        noConflictsForLsmOrFail(currentEntries, changes);
+    }
+
+    private void noConflictsForDeletedFilesOrFail(
+            List<ManifestEntry> currentEntries, List<ManifestEntry> changes) {
+        Set<ManifestEntry.Identifier> removedFiles =
+                changes.stream()
+                        .filter(e -> e.kind().equals(FileKind.DELETE))
+                        .map(ManifestEntry::identifier)
+                        .collect(Collectors.toSet());
+        if (removedFiles.isEmpty()) {
+            // early exit for append only changes
+            return;
+        }
+
+        // check that files to be removed are not yet removed
+        for (ManifestEntry entry : currentEntries) {
+            removedFiles.remove(entry.identifier());
+        }
         if (!removedFiles.isEmpty()) {
             throw new RuntimeException(
-                    "Conflicts detected on:\n"
+                    "File deletion conflicts detected! Give up committing compact changes. Conflict files are:\n"
                             + removedFiles.stream()
-                                    .map(
-                                            i ->
-                                                    pathFactory.getPartitionString(i.partition)
-                                                            + ", bucket "
-                                                            + i.bucket
-                                                            + ", level "
-                                                            + i.level
-                                                            + ", file "
-                                                            + i.fileName)
+                                    .map(i -> i.toString(pathFactory))
                                     .collect(Collectors.joining("\n")));
         }
     }
 
+    private void noConflictsForLsmOrFail(
+            List<ManifestEntry> currentEntries, List<ManifestEntry> changes) {
+        if (keyComparator == null) {
+            return;
+        }
+
+        List<ManifestEntry> allEntries = new ArrayList<>();
+        allEntries.addAll(currentEntries);
+        allEntries.addAll(changes);
+        Map<LevelIdentifier, List<ManifestEntry>> levels = new HashMap<>();
+        for (ManifestEntry entry : ManifestEntry.mergeManifestEntries(allEntries)) {
+            int level = entry.file().level();
+            if (level >= 1) {
+                levels.compute(
+                                new LevelIdentifier(entry.partition(), entry.bucket(), level),
+                                (lv, list) -> list == null ? new ArrayList<>() : list)
+                        .add(entry);
+            }
+        }
+
+        // check for all LSM level >= 1, key ranges of files do not intersect
+        for (List<ManifestEntry> entries : levels.values()) {
+            entries.sort((a, b) -> keyComparator.compare(a.file().minKey(), b.file().minKey()));
+            for (int i = 0; i + 1 < entries.size(); i++) {
+                ManifestEntry a = entries.get(i);
+                ManifestEntry b = entries.get(i + 1);
+                if (keyComparator.compare(a.file().maxKey(), b.file().minKey()) >= 0) {

Review Comment:
   Ah... Yes, it is >=



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #224: [FLINK-28582] Check LSM tree structure when multiple jobs are committing into the same bucket

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #224:
URL: https://github.com/apache/flink-table-store/pull/224#discussion_r923154967


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java:
##########
@@ -464,50 +479,88 @@ private boolean tryCommitOnce(
     }
 
     private void noConflictsOrFail(long snapshotId, List<ManifestEntry> changes) {
-        Set<ManifestEntry.Identifier> removedFiles =
-                changes.stream()
-                        .filter(e -> e.kind().equals(FileKind.DELETE))
-                        .map(ManifestEntry::identifier)
-                        .collect(Collectors.toSet());
-        if (removedFiles.isEmpty()) {
-            // early exit for append only changes
-            return;
-        }
-
         List<BinaryRowData> changedPartitions =
                 changes.stream()
                         .map(ManifestEntry::partition)
                         .distinct()
                         .collect(Collectors.toList());
+        List<ManifestEntry> currentEntries;
         try {
-            for (ManifestEntry entry :
+            currentEntries =
                     scan.withSnapshot(snapshotId)
                             .withPartitionFilter(changedPartitions)
                             .plan()
-                            .files()) {
-                removedFiles.remove(entry.identifier());
-            }
+                            .files();
         } catch (Throwable e) {
             throw new RuntimeException("Cannot determine if conflicts exist.", e);
         }
 
+        noConflictsForDeletedFilesOrFail(currentEntries, changes);
+        noConflictsForLsmOrFail(currentEntries, changes);
+    }
+
+    private void noConflictsForDeletedFilesOrFail(
+            List<ManifestEntry> currentEntries, List<ManifestEntry> changes) {
+        Set<ManifestEntry.Identifier> removedFiles =
+                changes.stream()
+                        .filter(e -> e.kind().equals(FileKind.DELETE))
+                        .map(ManifestEntry::identifier)
+                        .collect(Collectors.toSet());
+        if (removedFiles.isEmpty()) {
+            // early exit for append only changes
+            return;
+        }
+
+        // check that files to be removed are not yet removed
+        for (ManifestEntry entry : currentEntries) {
+            removedFiles.remove(entry.identifier());
+        }
         if (!removedFiles.isEmpty()) {
             throw new RuntimeException(
-                    "Conflicts detected on:\n"
+                    "File deletion conflicts detected! Give up committing compact changes. Conflict files are:\n"
                             + removedFiles.stream()
-                                    .map(
-                                            i ->
-                                                    pathFactory.getPartitionString(i.partition)
-                                                            + ", bucket "
-                                                            + i.bucket
-                                                            + ", level "
-                                                            + i.level
-                                                            + ", file "
-                                                            + i.fileName)
+                                    .map(i -> i.toString(pathFactory))
                                     .collect(Collectors.joining("\n")));
         }
     }
 
+    private void noConflictsForLsmOrFail(
+            List<ManifestEntry> currentEntries, List<ManifestEntry> changes) {
+        if (keyComparator == null) {
+            return;
+        }
+
+        List<ManifestEntry> allEntries = new ArrayList<>();
+        allEntries.addAll(currentEntries);
+        allEntries.addAll(changes);
+        Map<LevelIdentifier, List<ManifestEntry>> levels = new HashMap<>();
+        for (ManifestEntry entry : ManifestEntry.mergeManifestEntries(allEntries)) {
+            int level = entry.file().level();
+            if (level >= 1) {
+                levels.compute(
+                                new LevelIdentifier(entry.partition(), entry.bucket(), level),
+                                (lv, list) -> list == null ? new ArrayList<>() : list)
+                        .add(entry);
+            }
+        }
+
+        // check for all LSM level >= 1, key ranges of files do not intersect
+        for (List<ManifestEntry> entries : levels.values()) {
+            entries.sort((a, b) -> keyComparator.compare(a.file().minKey(), b.file().minKey()));
+            for (int i = 0; i + 1 < entries.size(); i++) {
+                ManifestEntry a = entries.get(i);
+                ManifestEntry b = entries.get(i + 1);
+                if (keyComparator.compare(a.file().maxKey(), b.file().minKey()) >= 0) {

Review Comment:
   Add a case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org