You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/02/04 05:53:12 UTC
[hudi] 07/07: [HUDI-5496] Avoid unnecessary file system parsing to initialize metadata table for a new data table (#7841)
This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 4286d7982374809e5517852adfcdcd5bba5619ec
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Fri Feb 3 17:59:18 2023 -0800
[HUDI-5496] Avoid unnecessary file system parsing to initialize metadata table for a new data table (#7841)
- Optimizing instantiation of metadata table for a fresh table by avoiding file listing
---
.../apache/hudi/client/BaseHoodieWriteClient.java | 2 +
.../metadata/HoodieBackedTableMetadataWriter.java | 64 ++++++++++++----------
.../SparkHoodieBackedTableMetadataWriter.java | 1 +
.../internal/HoodieDataSourceInternalWriter.java | 4 +-
4 files changed, 39 insertions(+), 32 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index c3260914bd5..17956479762 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -519,6 +519,8 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
*/
protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
try {
+ context.setJobStatus(this.getClass().getSimpleName(),"Cleaning up marker directories for commit " + instantTime + " in table "
+ + config.getTableName());
// Delete the marker directory for the instant.
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index a8356ff9c71..5e8367e2095 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -1086,39 +1086,45 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>();
- List<DirectoryInfo> partitionInfoList = listAllPartitions(dataMetaClient);
- Map<String, Map<String, Long>> partitionToFilesMap = partitionInfoList.stream()
- .map(p -> {
- String partitionName = HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
- return Pair.of(partitionName, p.getFileNameToSizeMap());
- })
- .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
- int totalDataFilesCount = partitionToFilesMap.values().stream().mapToInt(Map::size).sum();
- List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet());
-
- if (partitionTypes.contains(MetadataPartitionType.FILES)) {
- // Record which saves the list of all partitions
- HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions);
- HoodieData<HoodieRecord> filesPartitionRecords = getFilesPartitionRecords(createInstantTime, partitionInfoList, allPartitionRecord);
- ValidationUtils.checkState(filesPartitionRecords.count() == (partitions.size() + 1));
- partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords);
- }
+ // skip file system listing to populate metadata records if its a fresh table.
+ // this is applicable only if the table already has N commits and metadata is enabled at a later point in time.
+ if (createInstantTime.equals(SOLO_COMMIT_TIMESTAMP)) { // SOLO_COMMIT_TIMESTAMP will be the initial commit time in MDT for a fresh table.
+ // If not, last completed commit in data table will be chosen as the initial commit time.
+ LOG.info("Triggering empty Commit to metadata to initialize");
+ } else {
+ List<DirectoryInfo> partitionInfoList = listAllPartitions(dataMetaClient);
+ Map<String, Map<String, Long>> partitionToFilesMap = partitionInfoList.stream()
+ .map(p -> {
+ String partitionName = HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
+ return Pair.of(partitionName, p.getFileNameToSizeMap());
+ })
+ .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+ int totalDataFilesCount = partitionToFilesMap.values().stream().mapToInt(Map::size).sum();
+ List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet());
+
+ if (partitionTypes.contains(MetadataPartitionType.FILES)) {
+ // Record which saves the list of all partitions
+ HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions);
+ HoodieData<HoodieRecord> filesPartitionRecords = getFilesPartitionRecords(createInstantTime, partitionInfoList, allPartitionRecord);
+ ValidationUtils.checkState(filesPartitionRecords.count() == (partitions.size() + 1));
+ partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords);
+ }
- if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && totalDataFilesCount > 0) {
- final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
- engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams(), createInstantTime);
- partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, recordsRDD);
- }
+ if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && totalDataFilesCount > 0) {
+ final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
+ engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams(), createInstantTime);
+ partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, recordsRDD);
+ }
- if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) && totalDataFilesCount > 0) {
- final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
- engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams());
- partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, recordsRDD);
+ if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) && totalDataFilesCount > 0) {
+ final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+ engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams());
+ partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, recordsRDD);
+ }
+ LOG.info("Committing " + partitions.size() + " partitions and " + totalDataFilesCount + " files to metadata");
}
- LOG.info("Committing " + partitions.size() + " partitions and " + totalDataFilesCount + " files to metadata");
-
commit(createInstantTime, partitionToRecordsMap, false);
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 81526c25bcc..23537f6f798 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -133,6 +133,7 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
HoodieData<HoodieRecord> preppedRecords = prepRecords(partitionRecordsMap);
JavaRDD<HoodieRecord> preppedRecordRDD = HoodieJavaRDD.getJavaRDD(preppedRecords);
+ engineContext.setJobStatus(this.getClass().getName(), "Committing " + instantTime + " to metadata table " + metadataWriteConfig.getTableName());
try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig)) {
// rollback partially failed writes if any.
if (writeClient.rollbackFailedWrites()) {
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
index c4b21483e8f..11f5d5030b4 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
+++ b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
@@ -34,7 +34,6 @@ import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import org.apache.spark.sql.types.StructType;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -51,7 +50,6 @@ public class HoodieDataSourceInternalWriter implements DataSourceWriter {
private final DataSourceInternalWriterHelper dataSourceInternalWriterHelper;
private final boolean populateMetaFields;
private final Boolean arePartitionRecordsSorted;
- private Map<String, String> extraMetadataMap = new HashMap<>();
public HoodieDataSourceInternalWriter(String instantTime, HoodieWriteConfig writeConfig, StructType structType,
SparkSession sparkSession, Configuration configuration, DataSourceOptions dataSourceOptions,
@@ -61,7 +59,7 @@ public class HoodieDataSourceInternalWriter implements DataSourceWriter {
this.structType = structType;
this.populateMetaFields = populateMetaFields;
this.arePartitionRecordsSorted = arePartitionRecordsSorted;
- this.extraMetadataMap = DataSourceUtils.getExtraMetadata(dataSourceOptions.asMap());
+ Map<String, String> extraMetadataMap = DataSourceUtils.getExtraMetadata(dataSourceOptions.asMap());
this.dataSourceInternalWriterHelper = new DataSourceInternalWriterHelper(instantTime, writeConfig, structType,
sparkSession, configuration, extraMetadataMap);
}