You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/09/16 09:35:10 UTC

[GitHub] [hudi] prasannarajaperumal commented on a diff in pull request #5581: [HUDI-53] Implementation of a native DFS based index based on the metadata table.

prasannarajaperumal commented on code in PR #5581:
URL: https://github.com/apache/hudi/pull/5581#discussion_r972752821


##########
hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java:
##########
@@ -129,50 +131,18 @@ public String create() throws IOException {
   }
 
   @CliCommand(value = "metadata delete", help = "Remove the Metadata Table")
-  public String delete() throws Exception {
-    HoodieCLI.getTableMetaClient();
-    Path metadataPath = new Path(getMetadataTableBasePath(HoodieCLI.basePath));
-    try {
-      FileStatus[] statuses = HoodieCLI.fs.listStatus(metadataPath);
-      if (statuses.length > 0) {
-        HoodieCLI.fs.delete(metadataPath, true);
-      }
-    } catch (FileNotFoundException e) {
-      // Metadata directory does not exist
-    }
-
-    return String.format("Removed Metadata Table from %s", metadataPath);
-  }
-
-  @CliCommand(value = "metadata init", help = "Update the metadata table from commits since the creation")
-  public String init(@CliOption(key = {"readonly"}, unspecifiedDefaultValue = "false",

Review Comment:
   is init being removed? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java:
##########
@@ -172,13 +173,14 @@ public boolean commit(String instantTime, O writeStatuses, Option<Map<String, St
   public abstract boolean commit(String instantTime, O writeStatuses, Option<Map<String, String>> extraMetadata,
                                  String commitActionType, Map<String, List<String>> partitionToReplacedFileIds);
 
-  public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,
-                             String commitActionType) {
-    return commitStats(instantTime, stats, extraMetadata, commitActionType, Collections.emptyMap());
+  public boolean commitStats(String instantTime, HoodieData<WriteStatus> writeStatuses, List<HoodieWriteStat> stats,

Review Comment:
   Just clarifying - WriteStatus contains only deflated HoodieRecord - right? Making sure we dont hold on to the data in memory until we update the metadata table. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -434,6 +434,15 @@ public class HoodieWriteConfig extends HoodieConfig {
       .sinceVersion("0.10.0")
       .withDocumentation("File Id Prefix provider class, that implements `org.apache.hudi.fileid.FileIdPrefixProvider`");
 
+  /**
+   * If a valid location is specified, a copy of the write config is saved before each operation.
+   */
+  public static final ConfigProperty<String> CONFIG_EXPORT_DIR = ConfigProperty

Review Comment:
   Not clear where this is used? Write configs are persisted after each operation?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -573,25 +728,16 @@ private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, Metadata
   }
 
   /**
-   * Interface to assist in converting commit metadata to List of HoodieRecords to be written to metadata table.
-   * Updates of different commit metadata uses the same method to convert to HoodieRecords and hence.
-   */
-  private interface ConvertMetadataFunction {
-    List<HoodieRecord> convertMetadata();
-  }
-
-  /**
-   * Processes commit metadata from data table and commits to metadata table.
-   * @param instantTime instant time of interest.
-   * @param convertMetadataFunction converter function to convert the respective metadata to List of HoodieRecords to be written to metadata table.
-   * @param <T> type of commit metadata.
-   * @param canTriggerTableService true if table services can be triggered. false otherwise.
+   * Commits updates to the file listing partition.
+   *
+   * @param instantTime instant time to use for the commit
+   * @param recordList List of HoodieRecords to be written to file listing partition.
+   * @param canTriggerTableService true if table service can be triggered, false otherwise.
    */
-  private <T> void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction, boolean canTriggerTableService) {
-    if (enabled && metadata != null) {
-      List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
-      commit(engineContext.parallelize(records, 1), MetadataPartitionType.FILES.partitionPath(), instantTime, canTriggerTableService);
-    }
+  private void commitFileListingUpdate(String instantTime, List<HoodieRecord> recordList, boolean canTriggerTableService) {

Review Comment:
   Thanks for renaming this to make it more clear. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -470,10 +520,116 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
     // During bootstrap, the list of files to be committed can be huge. So creating a HoodieCommitMetadata out of these
     // large number of files and calling the existing update(HoodieCommitMetadata) function does not scale well.
     // Hence, we have a special commit just for the bootstrap scenario.
-    bootstrapCommit(dirInfoList, createInstantTime);
+    bootstrapCommit(dirInfoList, createInstantTime, 1);
+
+    final long totalFiles = dirInfoList.stream().mapToLong(d -> d.getTotalFiles()).sum();
+
+    metrics.ifPresent(m -> m.incrementMetric(HoodieMetadataMetrics.INITIALIZE_FILE_LISTING_TIME_STR, timer.endTimer()));
+    metrics.ifPresent(m -> m.incrementMetric(HoodieMetadataMetrics.READ_FILES_COUNT_STR, totalFiles));
+    return true;
+  }
+
+  /**
+   * Bootstrap the record index.
+   *
+   * @param dataMetaClient {@code HoodieTableMetaClient} for the dataset
+   * @param createInstantTime InstantTime to use for the commit
+   * @param inflightInstantTimestamp
+   * @param partitions List of partitions from which the data files are to be read
+   */
+  private boolean bootstrapRecordLevelIndex(HoodieEngineContext engineContext, HoodieTableMetaClient dataMetaClient,
+      String createInstantTime, Option<String> inflightInstantTimestamp) throws IOException {
+    ValidationUtils.checkState(enabled, "Record level index cannot be initialized as Metadata Table is not enabled");
+    ValidationUtils.checkState(dataWriteConfig.createRecordIndex(),
+        "Record level index cannot be initialized as it is not enabled");
+
+    // Starting two timers to time reading of keys and total time to bootstrap
+    HoodieTimer timer = new HoodieTimer().startTimer().startTimer();
+
+    // Collect the list of base files present
+    final List<String> partitions = metadata.getAllPartitionPaths();
+    final HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(dataMetaClient,
+        dataMetaClient.getActiveTimeline(), metadata);
+    final List<Pair<String, String>> partitionBaseFilePairs = new ArrayList<>();
+    for (String partition : partitions) {
+      partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition)
+          // ignore base files being created due to the inflight operation
+          .filter(baseFile -> !inflightInstantTimestamp.isPresent()
+              || !baseFile.getCommitTime().equals(inflightInstantTimestamp.get()))
+          .map(basefile -> Pair.of(partition, basefile.getFileName())).collect(Collectors.toList()));
+    }
+
+    LOG.info("Initializing record index from " + partitionBaseFilePairs.size() + " base files in "
+        + partitions.size() + " partitions");
+
+    // Collect record keys from the files in parallel
+    HoodieData<HoodieRecord> records = readRecordKeysFromBaseFiles(engineContext, partitionBaseFilePairs);
+    records.persist("MEMORY_AND_DISK_SER");
+    final long recordCount = records.count();
+
+    // Initialize the file groups
+    final int fileGroupCount = estimateFileGroupCount(MetadataPartitionType.RECORD_INDEX.partitionPath(), recordCount,
+        RECORD_INDEX_AVERAGE_RECORD_SIZE, dataWriteConfig.getRecordIndexMinFileGroupCount(),
+        dataWriteConfig.getRecordIndexMaxFileGroupCount(), dataWriteConfig.getRecordIndexGrowthFactor());
+    initializeFileGroups(MetadataPartitionType.RECORD_INDEX, createInstantTime, fileGroupCount);
+    metrics.ifPresent(m -> m.incrementMetric(HoodieMetadataMetrics.STAT_COUNT_FILE_GROUP, fileGroupCount));
+
+    if (recordCount > 0) {
+      LOG.info("Initializing record index with " + recordCount + " mappings");
+      metrics.ifPresent(m -> m.incrementMetric(HoodieMetadataMetrics.READ_RECORDKEYS_TIME_STR, timer.endTimer()));
+      metrics.ifPresent(m -> m.incrementMetric(HoodieMetadataMetrics.READ_RECORDKEYS_COUNT_STR, recordCount));
+
+      // tag and commit records
+      records = tagRecordsWithLocation(records, MetadataPartitionType.RECORD_INDEX.partitionPath());
+      commit(records, createInstantTime, MetadataPartitionType.RECORD_INDEX.partitionPath(), fileGroupCount);
+    }
+
+    metrics.ifPresent(m -> m.incrementMetric(HoodieMetadataMetrics.INITIALIZE_RECORD_INDEX_TIME_STR, timer.endTimer()));
     return true;
   }
 
+  /**
+   * Estimates the file group count to use for a partition.
+   *
+   * @param partitionName Name of the partition for which the file group count is to be estimated.
+   * @param recordCount The number of records expected to be written.
+   * @param averageRecordSize Average size of each record to be writen.
+   * @param minFileGroupCount Minimum number of file groups to use.
+   * @param maxFileGroupCount Maximum number of file groups to use.
+   * @param growthFactor By what factor are the records (recordCount) expected to grow?
+   * @param maxFileGroupSizeBytes Maximum size of the file group.
+   * @return The estimated number of file groups.
+   */
+  private int estimateFileGroupCount(String partitionName, long recordCount, int averageRecordSize, int minFileGroupCount,

Review Comment:
   Is the plan to generate file group count for other metadata partitions (bloom, column stats) also using some heuristics like this?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -42,7 +43,16 @@ public String getFileIdPrefix() {
     return fileIdPrefix;
   }
 
-  public static List<String> all() {
-    return Arrays.asList(MetadataPartitionType.FILES.partitionPath());
+  public static List<MetadataPartitionType> all() {
+    return Arrays.asList(MetadataPartitionType.FILES, MetadataPartitionType.RECORD_INDEX);
+  }
+
+  /**
+   * Returns the list of metadata table partitions which require WriteStatus to track written records.
+   *
+   * These partitions need the list of written records so that they can update their metadata.
+   */
+  public static List<MetadataPartitionType> needWriteStatusTracking() {
+    return Arrays.asList(MetadataPartitionType.RECORD_INDEX);

Review Comment:
   nit: Just simplify this to boolean and check if its RECORD_INDEX?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -116,8 +117,15 @@ protected HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String
     this.writeSchema = overriddenSchema.orElseGet(() -> getWriteSchema(config));
     this.writeSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(writeSchema, config.allowOperationMetadataField());
     this.timer = new HoodieTimer().startTimer();
-    this.writeStatus = (WriteStatus) ReflectionUtils.loadClass(config.getWriteStatusClassName(),
-        !hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction());
+
+    // We need to track written records within WriteStatus in two cases:
+    // 1. When the HoodieIndex being used is not implicit with storage
+    // 2. If any of the metadata table partitions (record index, etc) which require written record tracking are enabled
+    final boolean trackSuccessRecords = !hoodieTable.getIndex().isImplicitWithStorage()

Review Comment:
   nit: move !hoodieTable.getIndex().isImplicitWithStorage() also into HoodieTableMetadataUtil.needsWriteStatusTracking



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -174,6 +185,16 @@ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfi
     int minCommitsToKeep = Math.max(writeConfig.getMetadataMinCommitsToKeep(), writeConfig.getMinCommitsToKeep());
     int maxCommitsToKeep = Math.max(writeConfig.getMetadataMaxCommitsToKeep(), writeConfig.getMaxCommitsToKeep());
 
+    // File groups in each partitions are fixed at creation time and we do not want them to be split into muliple files
+    // ever. Hence we use a very large basefile size in metadata table. The actual size of the HFiles created will
+    // eventually depend on the number of file groups seleted for each partition (See estimateFileGroupCount function)
+    final long maxHFileSizeBytes = 10 * 1024 * 1024 * 1024L; // 10GB

Review Comment:
   Does the mapping of row key to file group breaks down if the HFile actually gets to this size and a new base file gets created? Should we just disable the max file check here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org