You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/02/07 10:50:36 UTC

[GitHub] [hudi] codope commented on a change in pull request #4746: [HUDI-3356][HUDI-3142] Metadata index initialization for bloom filters and column stats partitions

codope commented on a change in pull request #4746:
URL: https://github.com/apache/hudi/pull/4746#discussion_r800530826



##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -667,34 +660,60 @@ private static void processRollbackMetadata(HoodieActiveTimeline metadataTableTi
   }
 
   /**
-   * Convert rollback action metadata to column stats index records.
+   * Convert added and deleted action metadata to column stats index records.
    */
-  private static List<HoodieRecord> convertFilesToColumnStatsRecords(HoodieEngineContext engineContext,
-                                                                     HoodieTableMetaClient datasetMetaClient,
-                                                                     Map<String, List<String>> partitionToDeletedFiles,
-                                                                     Map<String, Map<String, Long>> partitionToAppendedFiles,
-                                                                     String instantTime) {
-    List<HoodieRecord> records = new LinkedList<>();
-    List<String> latestColumns = getLatestColumns(datasetMetaClient);
-    partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> deletedFileList.forEach(deletedFile -> {
+  public static HoodieData<HoodieRecord> convertFilesToColumnStatsRecords(HoodieEngineContext engineContext,
+                                                                          Map<String, List<String>> partitionToDeletedFiles,
+                                                                          Map<String, Map<String, Long>> partitionToAppendedFiles,
+                                                                          MetadataRecordsGenerationParams recordsGenerationParams) {
+    HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
+    final List<String> latestColumns = getLatestColumns(recordsGenerationParams.getDataMetaClient());
+
+    final List<Pair<String, List<String>>> partitionToDeletedFilesList = partitionToDeletedFiles.entrySet()
+        .stream().map(e -> Pair.of(e.getKey(), e.getValue())).collect(Collectors.toList());
+    final HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD = engineContext.parallelize(partitionToDeletedFilesList,
+        Math.max(partitionToDeletedFilesList.size(), recordsGenerationParams.getBloomIndexParallelism()));
+
+    HoodieData<HoodieRecord> deletedFilesRecordsRDD = partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesEntry -> {
+      final String partitionName = partitionToDeletedFilesEntry.getLeft();
       final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName;
-      if (deletedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+      final List<String> deletedFileList = partitionToDeletedFilesEntry.getRight();
+
+      return deletedFileList.stream().flatMap(deletedFile -> {
+        if (!FSUtils.isBaseFile(new Path(deletedFile)) || !deletedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+          return Stream.empty();
+        }
         final String filePathWithPartition = partitionName + "/" + deletedFile;
-        records.addAll(getColumnStats(partition, filePathWithPartition, datasetMetaClient,
-            latestColumns, true).collect(Collectors.toList()));
-      }
-    }));
-
-    partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> appendedFileMap.forEach(
-        (appendedFile, size) -> {
-          final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName;
-          if (appendedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
-            final String filePathWithPartition = partitionName + "/" + appendedFile;
-            records.addAll(getColumnStats(partition, filePathWithPartition, datasetMetaClient,
-                latestColumns, false).collect(Collectors.toList()));
-          }
-        }));
-    return records;
+        return getColumnStats(partition, filePathWithPartition, recordsGenerationParams.getDataMetaClient(),
+            latestColumns, true);
+      }).iterator();
+    });
+    allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
+
+    final List<Pair<String, Map<String, Long>>> partitionToAppendedFilesList = partitionToAppendedFiles.entrySet()
+        .stream().map(entry -> Pair.of(entry.getKey(), entry.getValue())).collect(Collectors.toList());
+    final HoodieData<Pair<String, Map<String, Long>>> partitionToAppendedFilesRDD = engineContext.parallelize(partitionToAppendedFilesList,
+        Math.max(partitionToAppendedFiles.size(), recordsGenerationParams.getBloomIndexParallelism()));
+
+    HoodieData<HoodieRecord> appendedFilesRecordsRDD = partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesEntry -> {
+      final String partitionName = partitionToAppendedFilesEntry.getLeft();
+      final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName;
+      final Map<String, Long> appendedFileMap = partitionToAppendedFilesEntry.getRight();
+
+      return appendedFileMap.entrySet().stream().flatMap(appendedFileNameLengthPair -> {
+        if (!FSUtils.isBaseFile(new Path(appendedFileNameLengthPair.getKey()))
+            || !appendedFileNameLengthPair.getKey().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {

Review comment:
       I think we should try to generalize this if-condition to a method. I know right now we just get stats from base parquet file but it might change tomorrow.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org