You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/11/01 19:21:01 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #3873: [HUDI-2634] Improved the metadata table bootstrap for very large tables.

nsivabalan commented on a change in pull request #3873:
URL: https://github.com/apache/hudi/pull/3873#discussion_r740454888



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -645,4 +612,83 @@ protected void doClean(AbstractHoodieWriteClient writeClient, String instantTime
     // metadata table.
     writeClient.clean(instantTime + "002");
   }
+
+  /**
+   * Commit the {@code HoodieRecord}s to Metadata Table as a new delta-commit.
+   *
+   */
+  protected abstract void commit(List<HoodieRecord> records, String partitionName, String instantTime);
+
+  /**
+   * Commit the partition to file listing information to Metadata Table as a new delta-commit.
+   *
+   */
+  protected abstract void commit(List<DirectoryInfo> dirInfoList, String createInstantTime);
+
+
+  /**
+   * A class which represents a directory and the files and directories inside it.
+   *
+   * A {@code PartitionFileInfo} object saves the name of the partition and various properties requires of each file
+   * required for bootstrapping the metadata table. Saving limited properties reduces the total memory footprint when
+   * a very large number of files are present in the dataset being bootstrapped.
+   */
+  public static class DirectoryInfo implements Serializable {
+    // Relative path of the directory (relative to the base directory)
+    private String relativePath;
+    // List of filenames within this partition
+    private List<String> filenames;
+    // Length of the various files
+    private List<Long> filelengths;
+    // List of directories within this partition
+    private List<Path> subdirs = new ArrayList<>();
+    // Is this a HUDI partition
+    private boolean isPartition = false;
+
+    public DirectoryInfo(String relativePath, FileStatus[] fileStatus) {
+      this.relativePath = relativePath;
+
+      // Pre-allocate with the maximum length possible
+      filenames = new ArrayList<>(fileStatus.length);
+      filelengths = new ArrayList<>(fileStatus.length);
+
+      for (FileStatus status : fileStatus) {
+        if (status.isDirectory()) {
+          this.subdirs.add(status.getPath());
+        } else if (status.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
+          // Presence of partition meta file implies this is a HUDI partition
+          this.isPartition = true;
+        } else if (FSUtils.isDataFile(status.getPath())) {
+          // Regular HUDI data file (base file or log file)
+          filenames.add(status.getPath().getName());
+          filelengths.add(status.getLen());
+        }
+      }
+    }
+
+    public String getRelativePath() {
+      return relativePath;
+    }
+
+    public int getTotalFiles() {
+      return filenames.size();
+    }
+
+    public boolean isPartition() {
+      return isPartition;
+    }
+
+    public List<Path> getSubdirs() {
+      return subdirs;
+    }
+
+    // Returns a map of filenames mapped to their lengths
+    public Map<String, Long> getFileMap() {

Review comment:
       if the caller is always going to be interested in, map of file name to length, can we populate the map directly in the constructor of DirectoryInfo() and not have two separate lists only. 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -419,52 +394,53 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
    * @param dataMetaClient
    * @return Map of partition names to a list of FileStatus for all the files in the partition
    */
-  private Map<String, List<FileStatus>> getPartitionsToFilesMapping(HoodieTableMetaClient dataMetaClient) {
+  private List<DirectoryInfo> listAllPartitions(HoodieTableMetaClient datasetMetaClient) {
     List<Path> pathsToList = new LinkedList<>();
     pathsToList.add(new Path(dataWriteConfig.getBasePath()));
 
-    Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
+    List<DirectoryInfo> foundPartitionsList = new LinkedList<>();
     final int fileListingParallelism = metadataWriteConfig.getFileListingParallelism();
     SerializableConfiguration conf = new SerializableConfiguration(dataMetaClient.getHadoopConf());
     final String dirFilterRegex = dataWriteConfig.getMetadataConfig().getDirectoryFilterRegex();
+    final String datasetBasePath = dataMetaClient.getBasePath();
 
     while (!pathsToList.isEmpty()) {
-      int listingParallelism = Math.min(fileListingParallelism, pathsToList.size());
+      // In each round we will list a section of directories
+      int numDirsToList = Math.min(fileListingParallelism, pathsToList.size());
       // List all directories in parallel
-      List<Pair<Path, FileStatus[]>> dirToFileListing = engineContext.map(pathsToList, path -> {
+      List<DirectoryInfo> foundDirsList = engineContext.map(pathsToList.subList(0,  numDirsToList), path -> {
         FileSystem fs = path.getFileSystem(conf.get());
-        return Pair.of(path, fs.listStatus(path));
-      }, listingParallelism);
-      pathsToList.clear();
+        String relativeDirPath = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), path);
+        return new DirectoryInfo(relativeDirPath, fs.listStatus(path));
+      }, numDirsToList);
+
+      pathsToList = new LinkedList<>(pathsToList.subList(numDirsToList, pathsToList.size()));
 
       // If the listing reveals a directory, add it to queue. If the listing reveals a hoodie partition, add it to
       // the results.
-      dirToFileListing.forEach(p -> {
-        if (!dirFilterRegex.isEmpty() && p.getLeft().getName().matches(dirFilterRegex)) {
-          LOG.info("Ignoring directory " + p.getLeft() + " which matches the filter regex " + dirFilterRegex);
-          return;
+      for (DirectoryInfo dirInfo : foundDirsList) {
+        if (!dirFilterRegex.isEmpty()) {
+          final String relativePath = dirInfo.getRelativePath();
+          if (!relativePath.isEmpty()) {
+            Path partitionPath = new Path(datasetBasePath, relativePath);
+            if (partitionPath.getName().matches(dirFilterRegex)) {
+              LOG.info("Ignoring directory " + partitionPath + " which matches the filter regex " + dirFilterRegex);
+              continue;
+            }
+          }
         }
 
-        List<FileStatus> filesInDir = Arrays.stream(p.getRight()).parallel()
-            .filter(fs -> !fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))
-            .collect(Collectors.toList());
-
-        if (p.getRight().length > filesInDir.size()) {
-          String partitionName = FSUtils.getRelativePartitionPath(new Path(dataMetaClient.getBasePath()), p.getLeft());
-          // deal with Non-partition table, we should exclude .hoodie
-          partitionToFileStatus.put(partitionName, filesInDir.stream()
-              .filter(f -> !f.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)).collect(Collectors.toList()));
+        if (dirInfo.isPartition()) {
+          // Add to result
+          foundPartitionsList.add(dirInfo);
         } else {
           // Add sub-dirs to the queue
-          pathsToList.addAll(Arrays.stream(p.getRight())
-              .filter(fs -> fs.isDirectory() && !fs.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
-              .map(fs -> fs.getPath())
-              .collect(Collectors.toList()));
+          pathsToList.addAll(dirInfo.getSubdirs());

Review comment:
       Also, I see this in master before this patch 
   ```
   if (p.getRight().length > filesInDir.size()) {
             String partitionName = FSUtils.getRelativePartitionPath(new Path(dataMetaClient.getBasePath()), p.getLeft());
             // deal with Non-partition table, we should exclude .hoodie
             partitionToFileStatus.put(partitionName, filesInDir.stream()
                 .filter(f -> !f.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)).collect(Collectors.toList()));
           }
   ```
   may I know how we are handling the same in this patch? 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -419,52 +394,53 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
    * @param dataMetaClient
    * @return Map of partition names to a list of FileStatus for all the files in the partition
    */
-  private Map<String, List<FileStatus>> getPartitionsToFilesMapping(HoodieTableMetaClient dataMetaClient) {
+  private List<DirectoryInfo> listAllPartitions(HoodieTableMetaClient datasetMetaClient) {
     List<Path> pathsToList = new LinkedList<>();
     pathsToList.add(new Path(dataWriteConfig.getBasePath()));
 
-    Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
+    List<DirectoryInfo> foundPartitionsList = new LinkedList<>();
     final int fileListingParallelism = metadataWriteConfig.getFileListingParallelism();
     SerializableConfiguration conf = new SerializableConfiguration(dataMetaClient.getHadoopConf());
     final String dirFilterRegex = dataWriteConfig.getMetadataConfig().getDirectoryFilterRegex();
+    final String datasetBasePath = dataMetaClient.getBasePath();
 
     while (!pathsToList.isEmpty()) {
-      int listingParallelism = Math.min(fileListingParallelism, pathsToList.size());
+      // In each round we will list a section of directories
+      int numDirsToList = Math.min(fileListingParallelism, pathsToList.size());
       // List all directories in parallel
-      List<Pair<Path, FileStatus[]>> dirToFileListing = engineContext.map(pathsToList, path -> {
+      List<DirectoryInfo> foundDirsList = engineContext.map(pathsToList.subList(0,  numDirsToList), path -> {

Review comment:
       minor: can we name this `processedDirectories`

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -419,52 +394,53 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
    * @param dataMetaClient
    * @return Map of partition names to a list of FileStatus for all the files in the partition
    */
-  private Map<String, List<FileStatus>> getPartitionsToFilesMapping(HoodieTableMetaClient dataMetaClient) {
+  private List<DirectoryInfo> listAllPartitions(HoodieTableMetaClient datasetMetaClient) {
     List<Path> pathsToList = new LinkedList<>();
     pathsToList.add(new Path(dataWriteConfig.getBasePath()));
 
-    Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
+    List<DirectoryInfo> foundPartitionsList = new LinkedList<>();
     final int fileListingParallelism = metadataWriteConfig.getFileListingParallelism();
     SerializableConfiguration conf = new SerializableConfiguration(dataMetaClient.getHadoopConf());
     final String dirFilterRegex = dataWriteConfig.getMetadataConfig().getDirectoryFilterRegex();
+    final String datasetBasePath = dataMetaClient.getBasePath();
 
     while (!pathsToList.isEmpty()) {
-      int listingParallelism = Math.min(fileListingParallelism, pathsToList.size());
+      // In each round we will list a section of directories
+      int numDirsToList = Math.min(fileListingParallelism, pathsToList.size());
       // List all directories in parallel
-      List<Pair<Path, FileStatus[]>> dirToFileListing = engineContext.map(pathsToList, path -> {
+      List<DirectoryInfo> foundDirsList = engineContext.map(pathsToList.subList(0,  numDirsToList), path -> {
         FileSystem fs = path.getFileSystem(conf.get());
-        return Pair.of(path, fs.listStatus(path));
-      }, listingParallelism);
-      pathsToList.clear();
+        String relativeDirPath = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), path);
+        return new DirectoryInfo(relativeDirPath, fs.listStatus(path));
+      }, numDirsToList);
+
+      pathsToList = new LinkedList<>(pathsToList.subList(numDirsToList, pathsToList.size()));
 
       // If the listing reveals a directory, add it to queue. If the listing reveals a hoodie partition, add it to
       // the results.
-      dirToFileListing.forEach(p -> {
-        if (!dirFilterRegex.isEmpty() && p.getLeft().getName().matches(dirFilterRegex)) {
-          LOG.info("Ignoring directory " + p.getLeft() + " which matches the filter regex " + dirFilterRegex);
-          return;
+      for (DirectoryInfo dirInfo : foundDirsList) {
+        if (!dirFilterRegex.isEmpty()) {
+          final String relativePath = dirInfo.getRelativePath();
+          if (!relativePath.isEmpty()) {
+            Path partitionPath = new Path(datasetBasePath, relativePath);
+            if (partitionPath.getName().matches(dirFilterRegex)) {
+              LOG.info("Ignoring directory " + partitionPath + " which matches the filter regex " + dirFilterRegex);
+              continue;
+            }
+          }
         }
 
-        List<FileStatus> filesInDir = Arrays.stream(p.getRight()).parallel()
-            .filter(fs -> !fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))
-            .collect(Collectors.toList());
-
-        if (p.getRight().length > filesInDir.size()) {
-          String partitionName = FSUtils.getRelativePartitionPath(new Path(dataMetaClient.getBasePath()), p.getLeft());
-          // deal with Non-partition table, we should exclude .hoodie
-          partitionToFileStatus.put(partitionName, filesInDir.stream()
-              .filter(f -> !f.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)).collect(Collectors.toList()));
+        if (dirInfo.isPartition()) {
+          // Add to result
+          foundPartitionsList.add(dirInfo);
         } else {
           // Add sub-dirs to the queue
-          pathsToList.addAll(Arrays.stream(p.getRight())
-              .filter(fs -> fs.isDirectory() && !fs.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
-              .map(fs -> fs.getPath())
-              .collect(Collectors.toList()));
+          pathsToList.addAll(dirInfo.getSubdirs());

Review comment:
       may I know where are we ignoring the meta paths? eg: L460 before this patch. 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -419,52 +394,53 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
    * @param dataMetaClient
    * @return Map of partition names to a list of FileStatus for all the files in the partition
    */
-  private Map<String, List<FileStatus>> getPartitionsToFilesMapping(HoodieTableMetaClient dataMetaClient) {
+  private List<DirectoryInfo> listAllPartitions(HoodieTableMetaClient datasetMetaClient) {
     List<Path> pathsToList = new LinkedList<>();
     pathsToList.add(new Path(dataWriteConfig.getBasePath()));
 
-    Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
+    List<DirectoryInfo> foundPartitionsList = new LinkedList<>();

Review comment:
       may be we can name it "partitionsToBootstrap" 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
##########
@@ -36,4 +49,9 @@
   public String getFileExtension() {
     return extension;
   }
+
+  public static boolean isBaseFile(Path path) {

Review comment:
       do you think we should move this to FSUtils? 

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -145,15 +152,39 @@ protected void commit(List<HoodieRecord> records, String partitionName, String i
    *
    * The record is tagged with respective file slice's location based on its record key.
    */
-  private JavaRDD<HoodieRecord> prepRecords(List<HoodieRecord> records, String partitionName, int numFileGroups) {
+  private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD, String partitionName, int numFileGroups) {
     List<FileSlice> fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName);
     ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups));
 
-    JavaSparkContext jsc = ((HoodieSparkEngineContext) engineContext).getJavaSparkContext();
-    return jsc.parallelize(records, 1).map(r -> {
+    return recordsRDD.map(r -> {
       FileSlice slice = fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(), numFileGroups));
       r.setCurrentLocation(new HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
       return r;
     });
   }
+
+  @Override
+  protected void commit(List<DirectoryInfo> partitionInfoList, String createInstantTime) {

Review comment:
       Recently we added some abstractions to hudi (HoodieData, HoodieJavaRdd, HoodieList). Can we re-use them to avoid duplications across flink and spark. 

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -145,15 +152,39 @@ protected void commit(List<HoodieRecord> records, String partitionName, String i
    *
    * The record is tagged with respective file slice's location based on its record key.
    */
-  private JavaRDD<HoodieRecord> prepRecords(List<HoodieRecord> records, String partitionName, int numFileGroups) {
+  private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD, String partitionName, int numFileGroups) {
     List<FileSlice> fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName);
     ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups));
 
-    JavaSparkContext jsc = ((HoodieSparkEngineContext) engineContext).getJavaSparkContext();
-    return jsc.parallelize(records, 1).map(r -> {
+    return recordsRDD.map(r -> {
       FileSlice slice = fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(), numFileGroups));
       r.setCurrentLocation(new HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
       return r;
     });
   }
+
+  @Override
+  protected void commit(List<DirectoryInfo> partitionInfoList, String createInstantTime) {

Review comment:
       We can have only one commit() method which deals with HoodieData and abstract out the details. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org