You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/02/24 06:50:10 UTC

[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4878: [HUDI-3465] Add validation of column stats and bloom filters in HoodieMetadataTableValidator

zhangyue19921010 commented on a change in pull request #4878:
URL: https://github.com/apache/hudi/pull/4878#discussion_r813589004



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
##########
@@ -438,27 +481,62 @@ private void validateLatestBaseFiles(HoodieTableFileSystemView metaFsView, Hoodi
   /**
    * Compare getLatestFileSlices between metadata table and fileSystem.
    */
-  private void validateLatestFileSlices(HoodieTableFileSystemView metaFsView, HoodieTableFileSystemView fsView, String partitionPath) {
+  private void validateLatestFileSlices(
+      HoodieMetadataValidationContext metadataTableBasedContext,
+      HoodieMetadataValidationContext fsBasedContext, String partitionPath) {
 
-    List<FileSlice> latestFileSlicesFromMetadataTable = metaFsView.getLatestFileSlices(partitionPath).sorted(new FileSliceCompactor()).collect(Collectors.toList());
-    List<FileSlice> latestFileSlicesFromFS = fsView.getLatestFileSlices(partitionPath).sorted(new FileSliceCompactor()).collect(Collectors.toList());
+    List<FileSlice> latestFileSlicesFromMetadataTable = metadataTableBasedContext.getSortedLatestFileSliceList(partitionPath);
+    List<FileSlice> latestFileSlicesFromFS = fsBasedContext.getSortedLatestFileSliceList(partitionPath);
 
-    LOG.info("Latest file list from metadata: " + latestFileSlicesFromMetadataTable + ". For partition " + partitionPath);
-    LOG.info("Latest file list from direct listing: " + latestFileSlicesFromFS + ". For partition " + partitionPath);
+    LOG.debug("Latest file list from metadata: " + latestFileSlicesFromMetadataTable + ". For partition " + partitionPath);
+    LOG.debug("Latest file list from direct listing: " + latestFileSlicesFromFS + ". For partition " + partitionPath);
 
-    validateFileSlice(latestFileSlicesFromMetadataTable, latestFileSlicesFromFS, partitionPath);
+    validate(latestFileSlicesFromMetadataTable, latestFileSlicesFromFS, partitionPath, "file slices");
     LOG.info("Validation of getLatestFileSlices succeeded for partition " + partitionPath);
   }
 
-  private HoodieTableFileSystemView createHoodieTableFileSystemView(HoodieSparkEngineContext engineContext, boolean enableMetadataTable) {
+  private void validateAllColumnStats(
+      HoodieMetadataValidationContext metadataTableBasedContext,
+      HoodieMetadataValidationContext fsBasedContext, String partitionPath) {
+    List<String> latestBaseFilenameList = fsBasedContext.getSortedLatestBaseFileList(partitionPath)
+        .stream().map(BaseFile::getFileName).collect(Collectors.toList());
+    List<HoodieColumnRangeMetadata<String>> metadataBasedColStats = metadataTableBasedContext
+        .getSortedColumnStatsList(partitionPath, latestBaseFilenameList);
+    List<HoodieColumnRangeMetadata<String>> fsBasedColStats = fsBasedContext
+        .getSortedColumnStatsList(partitionPath, latestBaseFilenameList);
 
-    HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
-        .enable(enableMetadataTable)
-        .withAssumeDatePartitioning(cfg.assumeDatePartitioning)
-        .build();
+    validate(metadataBasedColStats, fsBasedColStats, partitionPath, "column stats");
 
-    return FileSystemViewManager.createInMemoryFileSystemView(engineContext,
-        metaClient, metadataConfig);
+    LOG.info("Validation of column stats succeeded for partition " + partitionPath);
+  }
+
+  private void validateBloomFilters(
+      HoodieMetadataValidationContext metadataTableBasedContext,
+      HoodieMetadataValidationContext fsBasedContext, String partitionPath) {
+    List<String> latestBaseFilenameList = fsBasedContext.getSortedLatestBaseFileList(partitionPath)
+        .stream().map(BaseFile::getFileName).collect(Collectors.toList());
+    List<BloomFilterData> metadataBasedBloomFilters = metadataTableBasedContext

Review comment:
       same question for `latestBaseFilenameList ` mentioned before.

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
##########
@@ -438,27 +481,62 @@ private void validateLatestBaseFiles(HoodieTableFileSystemView metaFsView, Hoodi
   /**
    * Compare getLatestFileSlices between metadata table and fileSystem.
    */
-  private void validateLatestFileSlices(HoodieTableFileSystemView metaFsView, HoodieTableFileSystemView fsView, String partitionPath) {
+  private void validateLatestFileSlices(
+      HoodieMetadataValidationContext metadataTableBasedContext,
+      HoodieMetadataValidationContext fsBasedContext, String partitionPath) {
 
-    List<FileSlice> latestFileSlicesFromMetadataTable = metaFsView.getLatestFileSlices(partitionPath).sorted(new FileSliceCompactor()).collect(Collectors.toList());
-    List<FileSlice> latestFileSlicesFromFS = fsView.getLatestFileSlices(partitionPath).sorted(new FileSliceCompactor()).collect(Collectors.toList());
+    List<FileSlice> latestFileSlicesFromMetadataTable = metadataTableBasedContext.getSortedLatestFileSliceList(partitionPath);
+    List<FileSlice> latestFileSlicesFromFS = fsBasedContext.getSortedLatestFileSliceList(partitionPath);
 
-    LOG.info("Latest file list from metadata: " + latestFileSlicesFromMetadataTable + ". For partition " + partitionPath);
-    LOG.info("Latest file list from direct listing: " + latestFileSlicesFromFS + ". For partition " + partitionPath);
+    LOG.debug("Latest file list from metadata: " + latestFileSlicesFromMetadataTable + ". For partition " + partitionPath);
+    LOG.debug("Latest file list from direct listing: " + latestFileSlicesFromFS + ". For partition " + partitionPath);
 
-    validateFileSlice(latestFileSlicesFromMetadataTable, latestFileSlicesFromFS, partitionPath);
+    validate(latestFileSlicesFromMetadataTable, latestFileSlicesFromFS, partitionPath, "file slices");
     LOG.info("Validation of getLatestFileSlices succeeded for partition " + partitionPath);
   }
 
-  private HoodieTableFileSystemView createHoodieTableFileSystemView(HoodieSparkEngineContext engineContext, boolean enableMetadataTable) {
+  private void validateAllColumnStats(
+      HoodieMetadataValidationContext metadataTableBasedContext,
+      HoodieMetadataValidationContext fsBasedContext, String partitionPath) {
+    List<String> latestBaseFilenameList = fsBasedContext.getSortedLatestBaseFileList(partitionPath)
+        .stream().map(BaseFile::getFileName).collect(Collectors.toList());
+    List<HoodieColumnRangeMetadata<String>> metadataBasedColStats = metadataTableBasedContext
+        .getSortedColumnStatsList(partitionPath, latestBaseFilenameList);
+    List<HoodieColumnRangeMetadata<String>> fsBasedColStats = fsBasedContext
+        .getSortedColumnStatsList(partitionPath, latestBaseFilenameList);

Review comment:
       We get `latestBaseFilenameList ` based on `fsBasedContext ` and use it to get `metadataBasedColStats` and `fsBasedColStats ` both. Could we do 
   ```
   fsBasedContext -> fileNameListFsBased -> fsBasedColStats
   metadataTableBasedContext -> fileNameListMetaBased -> metadataBasedColStats
   ```
   
   Or it's ok to reuse the same `latestBaseFilenameList `.

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
##########
@@ -500,19 +578,179 @@ public int compare(FileSlice o1, FileSlice o2) {
     }
   }
 
-  public static class HoodieBaseFileCompactor implements Comparator<HoodieBaseFile>, Serializable {
+  public static class HoodieBaseFileComparator implements Comparator<HoodieBaseFile>, Serializable {
 
     @Override
     public int compare(HoodieBaseFile o1, HoodieBaseFile o2) {
       return o1.getPath().compareTo(o2.getPath());
     }
   }
 
-  public static class HoodieFileGroupCompactor implements Comparator<HoodieFileGroup>, Serializable {
+  public static class HoodieFileGroupComparator implements Comparator<HoodieFileGroup>, Serializable {
 
     @Override
     public int compare(HoodieFileGroup o1, HoodieFileGroup o2) {
       return o1.getFileGroupId().compareTo(o2.getFileGroupId());
     }
   }
-}
\ No newline at end of file
+
+  public static class HoodieColumnRangeMetadataComparator
+      implements Comparator<HoodieColumnRangeMetadata<String>>, Serializable {
+
+    @Override
+    public int compare(HoodieColumnRangeMetadata<String> o1, HoodieColumnRangeMetadata<String> o2) {
+      return o1.toString().compareTo(o2.toString());
+    }
+  }
+
+  /**
+   * Class for storing relevant information for metadata table validation.
+   * <p>
+   * If metadata table is disabled, the APIs provide the information, e.g., file listing,
+   * index, from the file system and base files.  If metadata table is enabled, the APIs
+   * provide the information from the metadata table.  The same API is expected to return
+   * the same information regardless of whether metadata table is enabled, which is
+   * verified in the {@link HoodieMetadataTableValidator}.
+   */
+  private static class HoodieMetadataValidationContext implements Serializable {
+    private HoodieTableMetaClient metaClient;
+    private HoodieTableFileSystemView fileSystemView;
+    private HoodieTableMetadata tableMetadata;
+    private boolean enableMetadataTable;
+    private List<String> allColumnNameList;
+
+    public HoodieMetadataValidationContext(
+        HoodieEngineContext engineContext, Config cfg, HoodieTableMetaClient metaClient,
+        boolean enableMetadataTable) {
+      this.metaClient = metaClient;
+      this.enableMetadataTable = enableMetadataTable;
+      HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
+          .enable(enableMetadataTable)
+          .withMetadataIndexBloomFilter(enableMetadataTable)
+          .withMetadataIndexColumnStats(enableMetadataTable)
+          .withMetadataIndexForAllColumns(enableMetadataTable)
+          .withAssumeDatePartitioning(cfg.assumeDatePartitioning)
+          .build();
+      this.fileSystemView = FileSystemViewManager.createInMemoryFileSystemView(engineContext,
+          metaClient, metadataConfig);
+      this.tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, metaClient.getBasePath(),
+          FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue());
+      if (metaClient.getCommitsTimeline().filterCompletedInstants().countInstants() > 0) {
+        this.allColumnNameList = getAllColumnNames();
+      }
+    }
+
+    public List<HoodieBaseFile> getSortedLatestBaseFileList(String partitionPath) {
+      return fileSystemView.getLatestBaseFiles(partitionPath)
+          .sorted(new HoodieBaseFileComparator()).collect(Collectors.toList());
+    }
+
+    public List<FileSlice> getSortedLatestFileSliceList(String partitionPath) {
+      return fileSystemView.getLatestFileSlices(partitionPath)
+          .sorted(new FileSliceComparator()).collect(Collectors.toList());
+    }
+
+    public List<HoodieFileGroup> getSortedAllFileGroupList(String partitionPath) {
+      return fileSystemView.getAllFileGroups(partitionPath)
+          .sorted(new HoodieFileGroupComparator()).collect(Collectors.toList());
+    }
+
+    public List<HoodieColumnRangeMetadata<String>> getSortedColumnStatsList(
+        String partitionPath, List<String> baseFileNameList) {
+      LOG.info("All column names for getting column stats: " + allColumnNameList);
+      if (enableMetadataTable) {
+        List<Pair<String, String>> partitionFileNameList = baseFileNameList.stream()
+            .map(filename -> Pair.of(partitionPath, filename)).collect(Collectors.toList());
+        return allColumnNameList.stream()
+            .flatMap(columnName ->
+                tableMetadata.getColumnStats(partitionFileNameList, columnName).values().stream()
+                    .map(stats -> new HoodieColumnRangeMetadata<>(
+                        stats.getFileName(),
+                        columnName,
+                        stats.getMinValue(),
+                        stats.getMaxValue(),
+                        stats.getNullCount(),
+                        stats.getValueCount(),
+                        stats.getTotalSize(),
+                        stats.getTotalUncompressedSize()))
+                    .collect(Collectors.toList())
+                    .stream())
+            .sorted(new HoodieColumnRangeMetadataComparator())
+            .collect(Collectors.toList());
+      } else {
+        return baseFileNameList.stream().flatMap(filename ->
+                new ParquetUtils().readRangeFromParquetMetadata(

Review comment:
       we use ParquetUtils to build HoodieColumnRangeMetadata when fs based
   and use `tableMetadata.getColumnStats` to build it when metaBased.
   
   Also I see that the api `getColumnStats` is not supported in FileSystemBackedTableMetadata
   ```
     @Override
     public Map<Pair<String, String>, HoodieMetadataColumnStats> getColumnStats(final List<Pair<String, String>> partitionNameFileNameList, final String columnName)
         throws HoodieMetadataException {
       throw new HoodieMetadataException("Unsupported operation: getColumnsStats!");
     }
   ```
   Maybe we could remove this logic into `FileSystemBackedTableMetadata` ? 
   
   Just a suggestion and it looks good to me for current logic.

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
##########
@@ -500,19 +578,179 @@ public int compare(FileSlice o1, FileSlice o2) {
     }
   }
 
-  public static class HoodieBaseFileCompactor implements Comparator<HoodieBaseFile>, Serializable {
+  public static class HoodieBaseFileComparator implements Comparator<HoodieBaseFile>, Serializable {
 
     @Override
     public int compare(HoodieBaseFile o1, HoodieBaseFile o2) {
       return o1.getPath().compareTo(o2.getPath());
     }
   }
 
-  public static class HoodieFileGroupCompactor implements Comparator<HoodieFileGroup>, Serializable {
+  public static class HoodieFileGroupComparator implements Comparator<HoodieFileGroup>, Serializable {
 
     @Override
     public int compare(HoodieFileGroup o1, HoodieFileGroup o2) {
       return o1.getFileGroupId().compareTo(o2.getFileGroupId());
     }
   }
-}
\ No newline at end of file
+
+  public static class HoodieColumnRangeMetadataComparator
+      implements Comparator<HoodieColumnRangeMetadata<String>>, Serializable {
+
+    @Override
+    public int compare(HoodieColumnRangeMetadata<String> o1, HoodieColumnRangeMetadata<String> o2) {
+      return o1.toString().compareTo(o2.toString());
+    }
+  }
+
+  /**
+   * Class for storing relevant information for metadata table validation.
+   * <p>
+   * If metadata table is disabled, the APIs provide the information, e.g., file listing,
+   * index, from the file system and base files.  If metadata table is enabled, the APIs
+   * provide the information from the metadata table.  The same API is expected to return
+   * the same information regardless of whether metadata table is enabled, which is
+   * verified in the {@link HoodieMetadataTableValidator}.
+   */
+  private static class HoodieMetadataValidationContext implements Serializable {
+    private HoodieTableMetaClient metaClient;
+    private HoodieTableFileSystemView fileSystemView;
+    private HoodieTableMetadata tableMetadata;
+    private boolean enableMetadataTable;
+    private List<String> allColumnNameList;
+
+    public HoodieMetadataValidationContext(
+        HoodieEngineContext engineContext, Config cfg, HoodieTableMetaClient metaClient,
+        boolean enableMetadataTable) {
+      this.metaClient = metaClient;
+      this.enableMetadataTable = enableMetadataTable;
+      HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
+          .enable(enableMetadataTable)
+          .withMetadataIndexBloomFilter(enableMetadataTable)
+          .withMetadataIndexColumnStats(enableMetadataTable)
+          .withMetadataIndexForAllColumns(enableMetadataTable)
+          .withAssumeDatePartitioning(cfg.assumeDatePartitioning)
+          .build();
+      this.fileSystemView = FileSystemViewManager.createInMemoryFileSystemView(engineContext,
+          metaClient, metadataConfig);
+      this.tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, metaClient.getBasePath(),
+          FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue());
+      if (metaClient.getCommitsTimeline().filterCompletedInstants().countInstants() > 0) {
+        this.allColumnNameList = getAllColumnNames();
+      }
+    }
+
+    public List<HoodieBaseFile> getSortedLatestBaseFileList(String partitionPath) {
+      return fileSystemView.getLatestBaseFiles(partitionPath)
+          .sorted(new HoodieBaseFileComparator()).collect(Collectors.toList());
+    }
+
+    public List<FileSlice> getSortedLatestFileSliceList(String partitionPath) {
+      return fileSystemView.getLatestFileSlices(partitionPath)
+          .sorted(new FileSliceComparator()).collect(Collectors.toList());
+    }
+
+    public List<HoodieFileGroup> getSortedAllFileGroupList(String partitionPath) {
+      return fileSystemView.getAllFileGroups(partitionPath)
+          .sorted(new HoodieFileGroupComparator()).collect(Collectors.toList());
+    }
+
+    public List<HoodieColumnRangeMetadata<String>> getSortedColumnStatsList(
+        String partitionPath, List<String> baseFileNameList) {
+      LOG.info("All column names for getting column stats: " + allColumnNameList);
+      if (enableMetadataTable) {
+        List<Pair<String, String>> partitionFileNameList = baseFileNameList.stream()
+            .map(filename -> Pair.of(partitionPath, filename)).collect(Collectors.toList());
+        return allColumnNameList.stream()
+            .flatMap(columnName ->
+                tableMetadata.getColumnStats(partitionFileNameList, columnName).values().stream()
+                    .map(stats -> new HoodieColumnRangeMetadata<>(
+                        stats.getFileName(),
+                        columnName,
+                        stats.getMinValue(),
+                        stats.getMaxValue(),
+                        stats.getNullCount(),
+                        stats.getValueCount(),
+                        stats.getTotalSize(),
+                        stats.getTotalUncompressedSize()))
+                    .collect(Collectors.toList())
+                    .stream())
+            .sorted(new HoodieColumnRangeMetadataComparator())
+            .collect(Collectors.toList());
+      } else {
+        return baseFileNameList.stream().flatMap(filename ->
+                new ParquetUtils().readRangeFromParquetMetadata(
+                    metaClient.getHadoopConf(),
+                    new Path(new Path(metaClient.getBasePath(), partitionPath), filename),
+                    allColumnNameList).stream())
+            .map(rangeMetadata -> new HoodieColumnRangeMetadata<String>(
+                rangeMetadata.getFilePath(),
+                rangeMetadata.getColumnName(),
+                // Note: here we ignore the type in the validation,
+                // since column stats from metadata table store the min/max values as String
+                rangeMetadata.getMinValue().toString(),
+                rangeMetadata.getMaxValue().toString(),
+                rangeMetadata.getNullCount(),
+                rangeMetadata.getValueCount(),
+                rangeMetadata.getTotalSize(),
+                rangeMetadata.getTotalUncompressedSize()
+            ))
+            .sorted(new HoodieColumnRangeMetadataComparator())
+            .collect(Collectors.toList());
+      }
+    }
+
+    public List<BloomFilterData> getSortedBloomFilterList(
+        String partitionPath, List<String> baseFileNameList) {
+      if (enableMetadataTable) {
+        List<Pair<String, String>> partitionFileNameList = baseFileNameList.stream()
+            .map(filename -> Pair.of(partitionPath, filename)).collect(Collectors.toList());
+        return tableMetadata.getBloomFilters(partitionFileNameList).entrySet().stream()
+            .map(entry -> BloomFilterData.builder()
+                .setPartitionPath(entry.getKey().getKey())
+                .setFilename(entry.getKey().getValue())
+                .setBloomFilter(entry.getValue())
+                .build())
+            .sorted()
+            .collect(Collectors.toList());
+      } else {
+        return baseFileNameList.stream()
+            .map(filename -> readBloomFilterFromFile(partitionPath, filename))
+            .filter(Option::isPresent)
+            .map(Option::get)
+            .collect(Collectors.toList());

Review comment:
       do we also need to call `.sorted()` here?

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
##########
@@ -500,19 +578,179 @@ public int compare(FileSlice o1, FileSlice o2) {
     }
   }
 
-  public static class HoodieBaseFileCompactor implements Comparator<HoodieBaseFile>, Serializable {
+  public static class HoodieBaseFileComparator implements Comparator<HoodieBaseFile>, Serializable {
 
     @Override
     public int compare(HoodieBaseFile o1, HoodieBaseFile o2) {
       return o1.getPath().compareTo(o2.getPath());
     }
   }
 
-  public static class HoodieFileGroupCompactor implements Comparator<HoodieFileGroup>, Serializable {
+  public static class HoodieFileGroupComparator implements Comparator<HoodieFileGroup>, Serializable {
 
     @Override
     public int compare(HoodieFileGroup o1, HoodieFileGroup o2) {
       return o1.getFileGroupId().compareTo(o2.getFileGroupId());
     }
   }
-}
\ No newline at end of file
+
+  public static class HoodieColumnRangeMetadataComparator
+      implements Comparator<HoodieColumnRangeMetadata<String>>, Serializable {
+
+    @Override
+    public int compare(HoodieColumnRangeMetadata<String> o1, HoodieColumnRangeMetadata<String> o2) {
+      return o1.toString().compareTo(o2.toString());
+    }
+  }
+
+  /**
+   * Class for storing relevant information for metadata table validation.
+   * <p>
+   * If metadata table is disabled, the APIs provide the information, e.g., file listing,
+   * index, from the file system and base files.  If metadata table is enabled, the APIs
+   * provide the information from the metadata table.  The same API is expected to return
+   * the same information regardless of whether metadata table is enabled, which is
+   * verified in the {@link HoodieMetadataTableValidator}.
+   */
+  private static class HoodieMetadataValidationContext implements Serializable {
+    private HoodieTableMetaClient metaClient;
+    private HoodieTableFileSystemView fileSystemView;
+    private HoodieTableMetadata tableMetadata;
+    private boolean enableMetadataTable;
+    private List<String> allColumnNameList;
+
+    public HoodieMetadataValidationContext(
+        HoodieEngineContext engineContext, Config cfg, HoodieTableMetaClient metaClient,
+        boolean enableMetadataTable) {
+      this.metaClient = metaClient;
+      this.enableMetadataTable = enableMetadataTable;
+      HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
+          .enable(enableMetadataTable)
+          .withMetadataIndexBloomFilter(enableMetadataTable)
+          .withMetadataIndexColumnStats(enableMetadataTable)
+          .withMetadataIndexForAllColumns(enableMetadataTable)
+          .withAssumeDatePartitioning(cfg.assumeDatePartitioning)
+          .build();
+      this.fileSystemView = FileSystemViewManager.createInMemoryFileSystemView(engineContext,
+          metaClient, metadataConfig);
+      this.tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, metaClient.getBasePath(),
+          FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue());
+      if (metaClient.getCommitsTimeline().filterCompletedInstants().countInstants() > 0) {
+        this.allColumnNameList = getAllColumnNames();
+      }
+    }
+
+    public List<HoodieBaseFile> getSortedLatestBaseFileList(String partitionPath) {
+      return fileSystemView.getLatestBaseFiles(partitionPath)
+          .sorted(new HoodieBaseFileComparator()).collect(Collectors.toList());
+    }
+
+    public List<FileSlice> getSortedLatestFileSliceList(String partitionPath) {
+      return fileSystemView.getLatestFileSlices(partitionPath)
+          .sorted(new FileSliceComparator()).collect(Collectors.toList());
+    }
+
+    public List<HoodieFileGroup> getSortedAllFileGroupList(String partitionPath) {
+      return fileSystemView.getAllFileGroups(partitionPath)
+          .sorted(new HoodieFileGroupComparator()).collect(Collectors.toList());
+    }
+
+    public List<HoodieColumnRangeMetadata<String>> getSortedColumnStatsList(
+        String partitionPath, List<String> baseFileNameList) {
+      LOG.info("All column names for getting column stats: " + allColumnNameList);
+      if (enableMetadataTable) {
+        List<Pair<String, String>> partitionFileNameList = baseFileNameList.stream()
+            .map(filename -> Pair.of(partitionPath, filename)).collect(Collectors.toList());
+        return allColumnNameList.stream()
+            .flatMap(columnName ->
+                tableMetadata.getColumnStats(partitionFileNameList, columnName).values().stream()
+                    .map(stats -> new HoodieColumnRangeMetadata<>(
+                        stats.getFileName(),
+                        columnName,
+                        stats.getMinValue(),
+                        stats.getMaxValue(),
+                        stats.getNullCount(),
+                        stats.getValueCount(),
+                        stats.getTotalSize(),
+                        stats.getTotalUncompressedSize()))
+                    .collect(Collectors.toList())
+                    .stream())
+            .sorted(new HoodieColumnRangeMetadataComparator())
+            .collect(Collectors.toList());
+      } else {
+        return baseFileNameList.stream().flatMap(filename ->
+                new ParquetUtils().readRangeFromParquetMetadata(
+                    metaClient.getHadoopConf(),
+                    new Path(new Path(metaClient.getBasePath(), partitionPath), filename),
+                    allColumnNameList).stream())
+            .map(rangeMetadata -> new HoodieColumnRangeMetadata<String>(
+                rangeMetadata.getFilePath(),
+                rangeMetadata.getColumnName(),
+                // Note: here we ignore the type in the validation,
+                // since column stats from metadata table store the min/max values as String
+                rangeMetadata.getMinValue().toString(),
+                rangeMetadata.getMaxValue().toString(),
+                rangeMetadata.getNullCount(),
+                rangeMetadata.getValueCount(),
+                rangeMetadata.getTotalSize(),
+                rangeMetadata.getTotalUncompressedSize()
+            ))
+            .sorted(new HoodieColumnRangeMetadataComparator())
+            .collect(Collectors.toList());
+      }
+    }
+
+    public List<BloomFilterData> getSortedBloomFilterList(
+        String partitionPath, List<String> baseFileNameList) {
+      if (enableMetadataTable) {
+        List<Pair<String, String>> partitionFileNameList = baseFileNameList.stream()
+            .map(filename -> Pair.of(partitionPath, filename)).collect(Collectors.toList());
+        return tableMetadata.getBloomFilters(partitionFileNameList).entrySet().stream()
+            .map(entry -> BloomFilterData.builder()
+                .setPartitionPath(entry.getKey().getKey())
+                .setFilename(entry.getKey().getValue())
+                .setBloomFilter(entry.getValue())
+                .build())
+            .sorted()
+            .collect(Collectors.toList());
+      } else {
+        return baseFileNameList.stream()
+            .map(filename -> readBloomFilterFromFile(partitionPath, filename))

Review comment:
       same as 
   ```
     @Override
     public Map<Pair<String, String>, ByteBuffer> getBloomFilters(final List<Pair<String, String>> partitionNameFileNameList)
         throws HoodieMetadataException {
       throw new HoodieMetadataException("Unsupported operation: getBloomFilters!");
     }
   ```
   
   If we could unify the API, we don' need to check `if (enableMetadataTable)` here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org