You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/01/04 16:00:09 UTC

[hudi] 03/06: [HUDI-1469] Faster initialization of metadata table using parallelized listing. (#2343)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 2bd4a68731826698d120765c534cabe6c3069a95
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Thu Dec 31 01:40:12 2020 -0800

    [HUDI-1469] Faster initialization of metadata table using parallelized listing. (#2343)
    
     * [HUDI-1469] Faster initialization of metadata table using parallelized listing which finds partitions and files in a single scan.
     * MINOR fixes
    
    Co-authored-by: Vinoth Chandar <vi...@apache.org>
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 12 +++
 .../metadata/HoodieBackedTableMetadataWriter.java  | 89 ++++++++++++++--------
 2 files changed, 68 insertions(+), 33 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 138f1be..2a26abd 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -82,6 +82,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
   public static final String BULKINSERT_INPUT_DATA_SCHEMA_DDL = "hoodie.bulkinsert.schema.ddl";
   public static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
   public static final String DELETE_PARALLELISM = "hoodie.delete.shuffle.parallelism";
+  public static final String FILE_LISTING_PARALLELISM = "hoodie.file.listing.parallelism";
   public static final String DEFAULT_ROLLBACK_PARALLELISM = "100";
   public static final String ROLLBACK_PARALLELISM = "hoodie.rollback.parallelism";
   public static final String WRITE_BUFFER_LIMIT_BYTES = "hoodie.write.buffer.limit.bytes";
@@ -256,6 +257,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     return Integer.parseInt(props.getProperty(ROLLBACK_PARALLELISM));
   }
 
+  public int getFileListingParallelism() {
+    return Math.max(Integer.parseInt(props.getProperty(FILE_LISTING_PARALLELISM)), 1);
+  }
+
   public boolean shouldRollbackUsingMarkers() {
     return Boolean.parseBoolean(props.getProperty(ROLLBACK_USING_MARKERS));
   }
@@ -1002,6 +1007,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
       return this;
     }
 
+    public Builder withFileListingParallelism(int parallelism) {
+      props.setProperty(FILE_LISTING_PARALLELISM, String.valueOf(parallelism));
+      return this;
+    }
+
     public Builder withUserDefinedBulkInsertPartitionerClass(String className) {
       props.setProperty(BULKINSERT_USER_DEFINED_PARTITIONER_CLASS, className);
       return this;
@@ -1188,6 +1198,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
           DEFAULT_PARALLELISM);
       setDefaultOnCondition(props, !props.containsKey(UPSERT_PARALLELISM), UPSERT_PARALLELISM, DEFAULT_PARALLELISM);
       setDefaultOnCondition(props, !props.containsKey(DELETE_PARALLELISM), DELETE_PARALLELISM, DEFAULT_PARALLELISM);
+      setDefaultOnCondition(props, !props.containsKey(FILE_LISTING_PARALLELISM), FILE_LISTING_PARALLELISM,
+          DEFAULT_PARALLELISM);
       setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), ROLLBACK_PARALLELISM,
           DEFAULT_ROLLBACK_PARALLELISM);
       setDefaultOnCondition(props, !props.containsKey(KEYGENERATOR_CLASS_PROP),
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 9282e3b..ed24980 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -274,44 +274,19 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
     initTableMetadata();
 
     // List all partitions in the basePath of the containing dataset
-    FileSystem fs = datasetMetaClient.getFs();
-    FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new FileSystemBackedTableMetadata(hadoopConf, datasetWriteConfig.getBasePath(),
-        datasetWriteConfig.shouldAssumeDatePartitioning());
-    List<String> partitions = fileSystemBackedTableMetadata.getAllPartitionPaths();
-    LOG.info("Initializing metadata table by using file listings in " + partitions.size() + " partitions");
-
-    // List all partitions in parallel and collect the files in them
-    int parallelism =  Math.max(partitions.size(), 1);
-    List<Pair<String, FileStatus[]>> partitionFileList = engineContext.map(partitions, partition -> {
-      FileStatus[] statuses = fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(datasetWriteConfig.getBasePath(), partition));
-      return Pair.of(partition, statuses);
-    }, parallelism);
+    LOG.info("Initializing metadata table by using file listings in " + datasetWriteConfig.getBasePath());
+    Map<String, List<FileStatus>> partitionToFileStatus = getPartitionsToFilesMapping(datasetMetaClient);
 
     // Create a HoodieCommitMetadata with writeStats for all discovered files
     int[] stats = {0};
     HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
 
-    partitionFileList.forEach(t -> {
-      final String partition = t.getKey();
-      try {
-        if (!fs.exists(new Path(datasetWriteConfig.getBasePath(), partition + Path.SEPARATOR + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))) {
-          return;
-        }
-      } catch (IOException e) {
-        throw new HoodieMetadataException("Failed to check partition " + partition, e);
-      }
-
+    partitionToFileStatus.forEach((partition, statuses) -> {
       // Filter the statuses to only include files which were created before or on createInstantTime
-      Arrays.stream(t.getValue()).filter(status -> {
+      statuses.stream().filter(status -> {
         String filename = status.getPath().getName();
-        if (filename.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
-          return false;
-        }
-        if (HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(filename), HoodieTimeline.GREATER_THAN,
-            createInstantTime)) {
-          return false;
-        }
-        return true;
+        return !HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(filename), HoodieTimeline.GREATER_THAN,
+            createInstantTime);
       }).forEach(status -> {
         HoodieWriteStat writeStat = new HoodieWriteStat();
         writeStat.setPath(partition + Path.SEPARATOR + status.getPath().getName());
@@ -329,11 +304,57 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
       }
     });
 
-    LOG.info("Committing " + partitionFileList.size() + " partitions and " + stats[0] + " files to metadata");
+    LOG.info("Committing " + partitionToFileStatus.size() + " partitions and " + stats[0] + " files to metadata");
     update(commitMetadata, createInstantTime);
   }
 
   /**
+   * Function to find hoodie partitions and list files in them in parallel.
+   *
+   * @param datasetMetaClient
+   * @return Map of partition names to a list of FileStatus for all the files in the partition
+   */
+  private Map<String, List<FileStatus>> getPartitionsToFilesMapping(HoodieTableMetaClient datasetMetaClient) {
+    List<Path> pathsToList = new LinkedList<>();
+    pathsToList.add(new Path(datasetWriteConfig.getBasePath()));
+
+    Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
+    final int fileListingParallelism = metadataWriteConfig.getFileListingParallelism();
+    SerializableConfiguration conf = new SerializableConfiguration(datasetMetaClient.getHadoopConf());
+
+    while (!pathsToList.isEmpty()) {
+      int listingParallelism = Math.min(fileListingParallelism, pathsToList.size());
+      // List all directories in parallel
+      List<Pair<Path, FileStatus[]>> dirToFileListing = engineContext.map(pathsToList, path -> {
+        FileSystem fs = path.getFileSystem(conf.get());
+        return Pair.of(path, fs.listStatus(path));
+      }, listingParallelism);
+      pathsToList.clear();
+
+      // If the listing reveals a directory, add it to queue. If the listing reveals a hoodie partition, add it to
+      // the results.
+      dirToFileListing.forEach(p -> {
+        List<FileStatus> filesInDir = Arrays.stream(p.getRight()).parallel()
+            .filter(fs -> !fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))
+            .collect(Collectors.toList());
+
+        if (p.getRight().length > filesInDir.size()) {
+          // Is a partition. Add all data files to result.
+          partitionToFileStatus.put(p.getLeft().getName(), filesInDir);
+        } else {
+          // Add sub-dirs to the queue
+          pathsToList.addAll(Arrays.stream(p.getRight())
+              .filter(fs -> fs.isDirectory() && !fs.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
+              .map(fs -> fs.getPath())
+              .collect(Collectors.toList()));
+        }
+      });
+    }
+
+    return partitionToFileStatus;
+  }
+
+  /**
    * Sync the Metadata Table from the instants created on the dataset.
    *
    * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
@@ -413,7 +434,9 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
       writeStats.forEach(hoodieWriteStat -> {
         String pathWithPartition = hoodieWriteStat.getPath();
         if (pathWithPartition == null) {
-          throw new HoodieMetadataException("Unable to find path in write stat to update metadata table " + hoodieWriteStat);
+          // Empty partition
+          LOG.warn("Unable to find path in write stat to update metadata table " + hoodieWriteStat);
+          return;
         }
 
         int offset = partition.equals(NON_PARTITIONED_NAME) ? 0 : partition.length() + 1;