You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/10/18 21:58:58 UTC

[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #6680: [HUDI-4812] lazy fetching partition path & file slice for HoodieFileIndex

alexeykudinkin commented on code in PR #6680:
URL: https://github.com/apache/hudi/pull/6680#discussion_r998681902


##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -310,35 +256,159 @@ private void doRefresh() {
             )
         );
 
-    cachedFileSize = cachedAllInputFileSlices.values().stream()
+    this.cachedFileSize += ret.values().stream()
         .flatMap(Collection::stream)
         .mapToLong(BaseHoodieTableFileIndex::fileSliceSize)
         .sum();
 
-    // If the partition value contains InternalRow.empty, we query it as a non-partitioned table.
-    queryAsNonePartitionedTable = partitionFiles.keySet().stream().anyMatch(p -> p.values.length == 0);
+    return ret;
+  }
 
-    long duration = System.currentTimeMillis() - startTime;
+  /**
+   * Get partition path with the given partition value
+   * @param partitionNames partition names
+   * @param values partition values
+   * @return partitions that match the given partition values
+   */
+  protected List<PartitionPath> getPartitionPaths(String[] partitionNames, String[] values) {
+    if (cachedAllPartitionPaths != null) {
+      LOG.info("All partition paths have already loaded, use it directly");

Review Comment:
   Let's downgrade this to debug



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -347,6 +417,17 @@ private void validate(HoodieTimeline activeTimeline, Option<String> queryInstant
     }
   }
 
+  protected boolean isAllInputFileSlicesCached() {
+    if (cachedAllPartitionPaths == null) {
+      return false;
+    }
+    return cachedAllPartitionPaths.stream().allMatch(p -> cachedAllInputFileSlices.containsKey(p));

Review Comment:
   Let's add a comment to the `cachedAllPartitionPaths` elaborating that it's always fetched fully, and never incrementally (otherwise this check would be incorrect)



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala:
##########
@@ -187,12 +189,16 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
    * @param predicates     The filter condition.
    * @return The pruned partition paths.
    */
-  protected def prunePartition(partitionPaths: Seq[PartitionPath], predicates: Seq[Expression]): Seq[PartitionPath] = {
+  protected def prunePartition(predicates: Seq[Expression]): Seq[PartitionPath] = {
     val partitionColumnNames = partitionSchema.fields.map(_.name).toSet
     val partitionPruningPredicates = predicates.filter {
       _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
     }
     if (partitionPruningPredicates.nonEmpty) {
+      val equalPredicate = partitionPruningPredicates.filter(_.isInstanceOf[EqualTo])
+      val names = equalPredicate.map(_.asInstanceOf[EqualTo]).map(_.left.asInstanceOf[AttributeReference].name).toArray

Review Comment:
   > By the way, currently I put an option to control the loading behavior, REFRESH_PARTITION_AND_FILES_IN_INITIALIZATION. With the above semantic, maybe we can remote it and let it be handled automatically. WDYT?
   
   Sorry, not sure i fully follow this one: what do you want to remove?



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -298,7 +244,7 @@ private void doRefresh() {
     // This logic is realized by `AbstractTableFileSystemView::getLatestMergedFileSlicesBeforeOrOn`
     // API.  Note that for COW table, the merging logic of two slices does not happen as there
     // is no compaction, thus there is no performance impact.
-    cachedAllInputFileSlices = partitionFiles.keySet().stream()
+    Map<PartitionPath, List<FileSlice>> ret = partitionFiles.keySet().stream()

Review Comment:
   nit: `listedPartitions`



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -310,35 +256,159 @@ private void doRefresh() {
             )
         );
 
-    cachedFileSize = cachedAllInputFileSlices.values().stream()
+    this.cachedFileSize += ret.values().stream()
         .flatMap(Collection::stream)
         .mapToLong(BaseHoodieTableFileIndex::fileSliceSize)
         .sum();
 
-    // If the partition value contains InternalRow.empty, we query it as a non-partitioned table.
-    queryAsNonePartitionedTable = partitionFiles.keySet().stream().anyMatch(p -> p.values.length == 0);
+    return ret;
+  }
 
-    long duration = System.currentTimeMillis() - startTime;
+  /**
+   * Get partition path with the given partition value
+   * @param partitionNames partition names
+   * @param values partition values
+   * @return partitions that match the given partition values
+   */
+  protected List<PartitionPath> getPartitionPaths(String[] partitionNames, String[] values) {

Review Comment:
   Let's make it accept `List<Pair<String, String>> partitionColumnValuePairs`



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java:
##########
@@ -132,6 +132,11 @@ static HoodieBackedTableMetadata createHoodieBackedTableMetadata(HoodieEngineCon
    */
   FileStatus[] getAllFilesInPartition(Path partitionPath) throws IOException;
 
+  /**
+   * Fetch list of all partitions path that match the given prefix

Review Comment:
   Let's make it very clear that the prefix here is referring to "partition-path being the prefix of the complete relative partition path"



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -310,35 +256,159 @@ private void doRefresh() {
             )
         );
 
-    cachedFileSize = cachedAllInputFileSlices.values().stream()
+    this.cachedFileSize += ret.values().stream()
         .flatMap(Collection::stream)
         .mapToLong(BaseHoodieTableFileIndex::fileSliceSize)
         .sum();
 
-    // If the partition value contains InternalRow.empty, we query it as a non-partitioned table.
-    queryAsNonePartitionedTable = partitionFiles.keySet().stream().anyMatch(p -> p.values.length == 0);
+    return ret;
+  }
 
-    long duration = System.currentTimeMillis() - startTime;
+  /**
+   * Get partition path with the given partition value
+   * @param partitionNames partition names
+   * @param values partition values
+   * @return partitions that match the given partition values
+   */
+  protected List<PartitionPath> getPartitionPaths(String[] partitionNames, String[] values) {
+    if (cachedAllPartitionPaths != null) {
+      LOG.info("All partition paths have already loaded, use it directly");
+      return cachedAllPartitionPaths;
+    }
 
-    LOG.info(String.format("Refresh table %s, spent: %d ms", metaClient.getTableConfig().getTableName(), duration));
+    Pair<String, Boolean> relativeQueryPartitionPathPair = composeRelativePartitionPaths(partitionNames, values);
+    // If the composed partition path is complete, we return it directly, to save extra DFS listing operations.
+    if (relativeQueryPartitionPathPair.getRight()) {
+      return Collections.singletonList(new PartitionPath(relativeQueryPartitionPathPair.getLeft(),
+          parsePartitionColumnValues(partitionColumns, relativeQueryPartitionPathPair.getLeft())));
+    }
+    // The input partition values (from query predicate) forms a prefix of partition path, do listing to the path only.
+    return listPartitionPaths(Collections.singletonList(relativeQueryPartitionPathPair.getLeft()));
   }
 
-  private Map<String, FileStatus[]> getAllFilesInPartitionsUnchecked(Collection<String> fullPartitionPathsMapToFetch) {
-    try {
-      return tableMetadata.getAllFilesInPartitions(new ArrayList<>(fullPartitionPathsMapToFetch));
-    } catch (IOException e) {
-      throw new HoodieIOException("Failed to list partition paths for a table", e);
+  /**
+   * Construct relative partition (i.e., partition prefix) from the given partition values

Review Comment:
   nit: "relative partition path"



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -310,35 +256,159 @@ private void doRefresh() {
             )
         );
 
-    cachedFileSize = cachedAllInputFileSlices.values().stream()
+    this.cachedFileSize += ret.values().stream()
         .flatMap(Collection::stream)
         .mapToLong(BaseHoodieTableFileIndex::fileSliceSize)
         .sum();
 
-    // If the partition value contains InternalRow.empty, we query it as a non-partitioned table.
-    queryAsNonePartitionedTable = partitionFiles.keySet().stream().anyMatch(p -> p.values.length == 0);
+    return ret;
+  }
 
-    long duration = System.currentTimeMillis() - startTime;
+  /**
+   * Get partition path with the given partition value
+   * @param partitionNames partition names
+   * @param values partition values
+   * @return partitions that match the given partition values
+   */
+  protected List<PartitionPath> getPartitionPaths(String[] partitionNames, String[] values) {
+    if (cachedAllPartitionPaths != null) {
+      LOG.info("All partition paths have already loaded, use it directly");
+      return cachedAllPartitionPaths;
+    }
 
-    LOG.info(String.format("Refresh table %s, spent: %d ms", metaClient.getTableConfig().getTableName(), duration));
+    Pair<String, Boolean> relativeQueryPartitionPathPair = composeRelativePartitionPaths(partitionNames, values);

Review Comment:
   We actually don't need to return boolean flag, we can detect whether this would be a full-path here as well (comparing # of values against # of partition columns)



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -139,7 +142,20 @@ public BaseHoodieTableFileIndex(HoodieEngineContext engineContext,
     this.engineContext = engineContext;
     this.fileStatusCache = fileStatusCache;
 
-    doRefresh();
+    /**
+     * The `shouldListLazily` variable controls how we initialize the TableFileIndex:
+     *  - non-lazy/eager listing (shouldListLazily=true):  all partitions and file slices will be loaded eagerly during initialization.

Review Comment:
   Typo: should be `shouldListLazy=false`



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -310,35 +256,159 @@ private void doRefresh() {
             )
         );
 
-    cachedFileSize = cachedAllInputFileSlices.values().stream()
+    this.cachedFileSize += ret.values().stream()
         .flatMap(Collection::stream)
         .mapToLong(BaseHoodieTableFileIndex::fileSliceSize)
         .sum();
 
-    // If the partition value contains InternalRow.empty, we query it as a non-partitioned table.
-    queryAsNonePartitionedTable = partitionFiles.keySet().stream().anyMatch(p -> p.values.length == 0);
+    return ret;
+  }
 
-    long duration = System.currentTimeMillis() - startTime;
+  /**
+   * Get partition path with the given partition value
+   * @param partitionNames partition names
+   * @param values partition values
+   * @return partitions that match the given partition values
+   */
+  protected List<PartitionPath> getPartitionPaths(String[] partitionNames, String[] values) {
+    if (cachedAllPartitionPaths != null) {
+      LOG.info("All partition paths have already loaded, use it directly");
+      return cachedAllPartitionPaths;
+    }
 
-    LOG.info(String.format("Refresh table %s, spent: %d ms", metaClient.getTableConfig().getTableName(), duration));
+    Pair<String, Boolean> relativeQueryPartitionPathPair = composeRelativePartitionPaths(partitionNames, values);
+    // If the composed partition path is complete, we return it directly, to save extra DFS listing operations.
+    if (relativeQueryPartitionPathPair.getRight()) {
+      return Collections.singletonList(new PartitionPath(relativeQueryPartitionPathPair.getLeft(),
+          parsePartitionColumnValues(partitionColumns, relativeQueryPartitionPathPair.getLeft())));
+    }
+    // The input partition values (from query predicate) forms a prefix of partition path, do listing to the path only.
+    return listPartitionPaths(Collections.singletonList(relativeQueryPartitionPathPair.getLeft()));
   }
 
-  private Map<String, FileStatus[]> getAllFilesInPartitionsUnchecked(Collection<String> fullPartitionPathsMapToFetch) {
-    try {
-      return tableMetadata.getAllFilesInPartitions(new ArrayList<>(fullPartitionPathsMapToFetch));
-    } catch (IOException e) {
-      throw new HoodieIOException("Failed to list partition paths for a table", e);
+  /**
+   * Construct relative partition (i.e., partition prefix) from the given partition values
+   * @return relative partition path and a flag to indicate if the path is complete (i.e., not a prefix)
+   */
+  private Pair<String, Boolean> composeRelativePartitionPaths(String[] partitionNames, String[] values) {

Review Comment:
   This is somewhat non-trivial logic and we need to make sure we're not duplicating it: we already all of this handled w/in `BuiltinKeyGenerator.combinePartitionPathInternal`, let's generalize it and apply it here
   



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -347,6 +417,17 @@ private void validate(HoodieTimeline activeTimeline, Option<String> queryInstant
     }
   }
 
+  protected boolean isAllInputFileSlicesCached() {
+    if (cachedAllPartitionPaths == null) {
+      return false;
+    }
+    return cachedAllPartitionPaths.stream().allMatch(p -> cachedAllInputFileSlices.containsKey(p));
+  }
+
+  protected boolean isPartitionedTable() {
+    return !queryAsNonePartitionedTable && (partitionColumns.length > 0 || HoodieTableMetadata.isMetadataTable(basePath.toString()));

Review Comment:
   We actually should remove `queryAsNonePartitionedTable` completely



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -165,7 +164,7 @@ case class HoodieFileIndex(spark: SparkSession,
 
       logInfo(s"Total base files: $totalFileSize; " +
         s"candidate files after data skipping : $candidateFileSize; " +
-        s"skipping percent ${if (allFiles.nonEmpty && totalFileSize > 0) (totalFileSize - candidateFileSize) / totalFileSize.toDouble else 0}")
+        s"skipping percent ${if (!isAllInputFileSlicesCached) "is disable" else if (allFiles.nonEmpty && totalFileSize > 0) (totalFileSize - candidateFileSize) / totalFileSize.toDouble else 0}")

Review Comment:
   nit: "is disabled"



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -180,15 +185,118 @@ public void close() throws Exception {
   }
 
   protected List<PartitionPath> getAllQueryPartitionPaths() {
+    if (this.cachedAllPartitionPaths != null) {
+      return this.cachedAllPartitionPaths;
+    }
+
     List<String> queryRelativePartitionPaths = queryPaths.stream()
         .map(path -> FSUtils.getRelativePartitionPath(basePath, path))
         .collect(Collectors.toList());
 
-    // Load all the partition path from the basePath, and filter by the query partition path.
-    // TODO load files from the queryRelativePartitionPaths directly.
-    List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
-        .stream()
-        .filter(path -> queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+    this.cachedAllPartitionPaths = getQueryPartitionPaths(queryRelativePartitionPaths);
+
+    // If the partition value contains InternalRow.empty, we query it as a non-partitioned table.
+    this.queryAsNonePartitionedTable = this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+    return this.cachedAllPartitionPaths;
+  }
+
+  protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+    if (!isAllInputFileSlicesCached) {
+      doRefresh();
+    }
+    return cachedAllInputFileSlices;
+  }
+
+  /**
+   * Get input file slice for the given partition. Will use cache directly if it is computed before.
+   */
+  protected List<FileSlice> getCachedInputFileSlices(PartitionPath partition) {
+    return cachedAllInputFileSlices.computeIfAbsent(partition, (p) -> {
+      FileStatus[] files = loadPartitionPathFiles(p);
+      HoodieTimeline activeTimeline = getActiveTimeline();
+      Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+
+      HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(metaClient, activeTimeline, files);
+
+      Option<String> queryInstant = specifiedQueryInstant.or(() -> latestInstant.map(HoodieInstant::getTimestamp));
+
+      validate(activeTimeline, queryInstant);
+
+      List<FileSlice> ret;
+      if (tableType.equals(HoodieTableType.MERGE_ON_READ) && queryType.equals(HoodieTableQueryType.SNAPSHOT)) {
+        ret = queryInstant.map(instant ->
+                fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p.path, queryInstant.get())
+                    .collect(Collectors.toList())
+            )
+            .orElse(Collections.emptyList());
+      } else {
+        ret = queryInstant.map(instant ->
+                fileSystemView.getLatestFileSlicesBeforeOrOn(p.path, instant, true)
+            )
+            .orElse(fileSystemView.getLatestFileSlices(p.path))
+            .collect(Collectors.toList());
+      }
+
+      cachedFileSize += ret.stream().mapToLong(BaseHoodieTableFileIndex::fileSliceSize).sum();
+      return ret;
+    });
+  }
+
+  /**
+   * Get partition path with the given partition value
+   * @param partitionNames partition names
+   * @param values partition values
+   * @return partitions that match the given partition values
+   */
+  protected List<PartitionPath> getPartitionPaths(String[] partitionNames, String[] values) {
+    if (partitionNames.length == 0 || partitionNames.length != values.length) {
+      LOG.info("The input partition names or value is empty, fallback to return all partition paths");
+      return getAllQueryPartitionPaths();
+    }
+
+    if (cachedAllPartitionPaths != null) {
+      LOG.info("All partition paths have already loaded, use it directly");
+      return cachedAllPartitionPaths;
+    }
+
+    boolean hiveStylePartitioning = Boolean.parseBoolean(metaClient.getTableConfig().getHiveStylePartitioningEnable());
+    boolean urlEncodePartitioning = Boolean.parseBoolean(this.metaClient.getTableConfig().getUrlEncodePartitioning());
+    Map<String, Integer> partitionNamesToIdx = IntStream.range(0, partitionNames.length)
+        .mapToObj(i -> Pair.of(i, partitionNames[i]))
+        .collect(Collectors.toMap(Pair::getValue, Pair::getKey));
+    StringBuilder queryPartitionPath = new StringBuilder();
+    int idx = 0;
+    for (; idx < partitionNames.length; ++idx) {
+      String columnNames = this.partitionColumns[idx];
+      if (partitionNamesToIdx.containsKey(columnNames)) {
+        int k = partitionNamesToIdx.get(columnNames);
+        String value =  urlEncodePartitioning ? PartitionPathEncodeUtils.escapePathName(values[k]) : values[k];
+        queryPartitionPath.append(hiveStylePartitioning ? columnNames + "=" : "").append(value).append("/");
+      } else {
+        break;
+      }
+    }
+    queryPartitionPath.deleteCharAt(queryPartitionPath.length() - 1);
+    // Return directly if all partition values are specified.
+    if (idx == this.partitionColumns.length) {
+      return Collections.singletonList(new PartitionPath(queryPartitionPath.toString(), parsePartitionColumnValues(partitionColumns, queryPartitionPath.toString())));
+    }
+    return getQueryPartitionPaths(Collections.singletonList(queryPartitionPath.toString()));
+  }
+
+  private List<PartitionPath> getQueryPartitionPaths(List<String> queryRelativePartitionPaths) {
+    List<String> matchedPartitionPaths = queryRelativePartitionPaths.stream()
+        .flatMap(prefix -> {
+          try {
+            // Handle wildcard specially. This will have FileIndex to query the table as non-partitioned-table
+            if (prefix.contains("*")) {

Review Comment:
   I don't think we should be handling it here though: paths globbing is handled w/in the `DefaultSource` and it shouldn't be reaching the FileIndex



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -143,6 +143,13 @@ protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String key,
     return recordsByKeys.size() == 0 ? Option.empty() : recordsByKeys.get(0).getValue();
   }
 
+  @Override
+  public List<String> getPartitionPathsWithPrefix(String prefix) throws IOException {
+    return getAllPartitionPaths().stream()

Review Comment:
   Correct



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -310,35 +256,159 @@ private void doRefresh() {
             )
         );
 
-    cachedFileSize = cachedAllInputFileSlices.values().stream()
+    this.cachedFileSize += ret.values().stream()
         .flatMap(Collection::stream)
         .mapToLong(BaseHoodieTableFileIndex::fileSliceSize)
         .sum();
 
-    // If the partition value contains InternalRow.empty, we query it as a non-partitioned table.
-    queryAsNonePartitionedTable = partitionFiles.keySet().stream().anyMatch(p -> p.values.length == 0);
+    return ret;
+  }
 
-    long duration = System.currentTimeMillis() - startTime;
+  /**
+   * Get partition path with the given partition value
+   * @param partitionNames partition names
+   * @param values partition values
+   * @return partitions that match the given partition values
+   */
+  protected List<PartitionPath> getPartitionPaths(String[] partitionNames, String[] values) {
+    if (cachedAllPartitionPaths != null) {
+      LOG.info("All partition paths have already loaded, use it directly");
+      return cachedAllPartitionPaths;
+    }
 
-    LOG.info(String.format("Refresh table %s, spent: %d ms", metaClient.getTableConfig().getTableName(), duration));
+    Pair<String, Boolean> relativeQueryPartitionPathPair = composeRelativePartitionPaths(partitionNames, values);

Review Comment:
   We also need to make sure that the partition columns are ordered in the same way they are w/in the partition schema (otherwise paths won't match)



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -138,6 +138,14 @@ object DataSourceReadOptions {
         " read from the data file (in Hudi partition columns are persisted by default)." +
         " This config is a fallback allowing to preserve existing behavior, and should not be used otherwise.")
 
+  val REFRESH_PARTITION_AND_FILES_IN_INITIALIZATION: ConfigProperty[Boolean] =

Review Comment:
   @YuweiXiao the other way around:
    - We should list lazily by default
    - We should add config to override this behavior and fallback to eager listing (if necessary)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org