You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2023/02/04 01:59:27 UTC

[hudi] branch master updated: [HUDI-5496] Avoid unnecessary file system parsing to initialize metadata table for a new data table (#7841)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cdfacba1bd [HUDI-5496] Avoid unnecessary file system parsing to initialize metadata table for a new data table (#7841)
5cdfacba1bd is described below

commit 5cdfacba1bddfddb7b2b6a75a3ff0e3b6766392b
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Fri Feb 3 17:59:18 2023 -0800

    [HUDI-5496] Avoid unnecessary file system parsing to initialize metadata table for a new data table (#7841)
    
    - Optimizing instantiation of metadata table for a fresh table by avoiding file listing
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  |  2 +
 .../metadata/HoodieBackedTableMetadataWriter.java  | 64 ++++++++++++----------
 .../SparkHoodieBackedTableMetadataWriter.java      |  1 +
 .../internal/HoodieDataSourceInternalWriter.java   |  4 +-
 4 files changed, 39 insertions(+), 32 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index c3260914bd5..17956479762 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -519,6 +519,8 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
    */
   protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
     try {
+      context.setJobStatus(this.getClass().getSimpleName(),"Cleaning up marker directories for commit " + instantTime + " in table "
+          + config.getTableName());
       // Delete the marker directory for the instant.
       WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
           .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index a8356ff9c71..5e8367e2095 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -1086,39 +1086,45 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
 
     Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>();
 
-    List<DirectoryInfo> partitionInfoList = listAllPartitions(dataMetaClient);
-    Map<String, Map<String, Long>> partitionToFilesMap = partitionInfoList.stream()
-        .map(p -> {
-          String partitionName = HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
-          return Pair.of(partitionName, p.getFileNameToSizeMap());
-        })
-        .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
-    int totalDataFilesCount = partitionToFilesMap.values().stream().mapToInt(Map::size).sum();
-    List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet());
-
-    if (partitionTypes.contains(MetadataPartitionType.FILES)) {
-      // Record which saves the list of all partitions
-      HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions);
-      HoodieData<HoodieRecord> filesPartitionRecords = getFilesPartitionRecords(createInstantTime, partitionInfoList, allPartitionRecord);
-      ValidationUtils.checkState(filesPartitionRecords.count() == (partitions.size() + 1));
-      partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords);
-    }
+    // skip file system listing to populate metadata records if its a fresh table.
+    // this is applicable only if the table already has N commits and metadata is enabled at a later point in time.
+    if (createInstantTime.equals(SOLO_COMMIT_TIMESTAMP)) { // SOLO_COMMIT_TIMESTAMP will be the initial commit time in MDT for a fresh table.
+      // If not, last completed commit in data table will be chosen as the initial commit time.
+      LOG.info("Triggering empty Commit to metadata to initialize");
+    } else {
+      List<DirectoryInfo> partitionInfoList = listAllPartitions(dataMetaClient);
+      Map<String, Map<String, Long>> partitionToFilesMap = partitionInfoList.stream()
+          .map(p -> {
+            String partitionName = HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
+            return Pair.of(partitionName, p.getFileNameToSizeMap());
+          })
+          .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+      int totalDataFilesCount = partitionToFilesMap.values().stream().mapToInt(Map::size).sum();
+      List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet());
+
+      if (partitionTypes.contains(MetadataPartitionType.FILES)) {
+        // Record which saves the list of all partitions
+        HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions);
+        HoodieData<HoodieRecord> filesPartitionRecords = getFilesPartitionRecords(createInstantTime, partitionInfoList, allPartitionRecord);
+        ValidationUtils.checkState(filesPartitionRecords.count() == (partitions.size() + 1));
+        partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords);
+      }
 
-    if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && totalDataFilesCount > 0) {
-      final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
-          engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams(), createInstantTime);
-      partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, recordsRDD);
-    }
+      if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && totalDataFilesCount > 0) {
+        final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
+            engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams(), createInstantTime);
+        partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, recordsRDD);
+      }
 
-    if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) && totalDataFilesCount > 0) {
-      final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
-          engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams());
-      partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, recordsRDD);
+      if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) && totalDataFilesCount > 0) {
+        final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+            engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams());
+        partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, recordsRDD);
+      }
+      LOG.info("Committing " + partitions.size() + " partitions and " + totalDataFilesCount + " files to metadata");
     }
 
-    LOG.info("Committing " + partitions.size() + " partitions and " + totalDataFilesCount + " files to metadata");
-
     commit(createInstantTime, partitionToRecordsMap, false);
   }
 
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 81526c25bcc..23537f6f798 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -133,6 +133,7 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
     HoodieData<HoodieRecord> preppedRecords = prepRecords(partitionRecordsMap);
     JavaRDD<HoodieRecord> preppedRecordRDD = HoodieJavaRDD.getJavaRDD(preppedRecords);
 
+    engineContext.setJobStatus(this.getClass().getName(), "Committing " + instantTime + " to metadata table " + metadataWriteConfig.getTableName());
     try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig)) {
       // rollback partially failed writes if any.
       if (writeClient.rollbackFailedWrites()) {
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
index c4b21483e8f..11f5d5030b4 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
+++ b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
@@ -34,7 +34,6 @@ import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
 import org.apache.spark.sql.types.StructType;
 
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -51,7 +50,6 @@ public class HoodieDataSourceInternalWriter implements DataSourceWriter {
   private final DataSourceInternalWriterHelper dataSourceInternalWriterHelper;
   private final boolean populateMetaFields;
   private final Boolean arePartitionRecordsSorted;
-  private Map<String, String> extraMetadataMap = new HashMap<>();
 
   public HoodieDataSourceInternalWriter(String instantTime, HoodieWriteConfig writeConfig, StructType structType,
                                         SparkSession sparkSession, Configuration configuration, DataSourceOptions dataSourceOptions,
@@ -61,7 +59,7 @@ public class HoodieDataSourceInternalWriter implements DataSourceWriter {
     this.structType = structType;
     this.populateMetaFields = populateMetaFields;
     this.arePartitionRecordsSorted = arePartitionRecordsSorted;
-    this.extraMetadataMap = DataSourceUtils.getExtraMetadata(dataSourceOptions.asMap());
+    Map<String, String> extraMetadataMap = DataSourceUtils.getExtraMetadata(dataSourceOptions.asMap());
     this.dataSourceInternalWriterHelper = new DataSourceInternalWriterHelper(instantTime, writeConfig, structType,
         sparkSession, configuration, extraMetadataMap);
   }