You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/20 09:06:57 UTC

[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #157: [FLINK-28035] Support rescale overwrite

LadyForest commented on code in PR #157:
URL: https://github.com/apache/flink-table-store/pull/157#discussion_r901430186


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java:
##########
@@ -206,7 +206,22 @@ public Plan plan() {
                             "Unknown value kind " + entry.kind().name());
             }
         }
-        List<ManifestEntry> files = new ArrayList<>(map.values());
+        List<ManifestEntry> files = new ArrayList<>();
+        for (ManifestEntry file : map.values()) {
+            if (checkNumOfBuckets) {
+                Preconditions.checkState(
+                        file.totalBuckets() == numOfBuckets,
+                        "Trying to add file %s "
+                                + "with total bucket number %s, but the current bucket number is %s. Manifest might be corrupted.",
+                        file.file().fileName(),
+                        file.totalBuckets(),
+                        numOfBuckets);
+            }
+
+            if (filterByBucket(file)) {

Review Comment:
   > Can we just keep this as it is? I don't see the need to modify it.
   
   I don't think so. Actually if filtering the bucket before checking, the result might not be corrected.
   
   E.g. 
   The old `numOfBuckets` is 2, the new `numOfBuckets` is 1. 
   As a result, the only value for `specifiedBucket` is 0.
   
   Suppose the `entry` has bucket id is 1, then after filtering the result is empty.
   
   ```java
   private boolean filterByPartitionAndBucket(ManifestEntry entry) {
           if (specifiedBucket != null) {
               Preconditions.checkState(
                       specifiedBucket < entry.totalBuckets(),
                       "Bucket number has been changed. Manifest might be corrupted.");
           }
           return (partitionFilter == null
                           || partitionFilter.test(partitionConverter.convert(entry.partition())))
                   && (specifiedBucket == null || entry.bucket() == specifiedBucket);
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org