You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/03/08 20:59:20 UTC

[GitHub] [hudi] yihua commented on a change in pull request #4948: [HUDI-3514] Rebase Data Skipping flow to rely on MT Column Stats index

yihua commented on a change in pull request #4948:
URL: https://github.com/apache/hudi/pull/4948#discussion_r822042309



##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -194,77 +192,102 @@ case class HoodieFileIndex(spark: SparkSession,
    * @param queryFilters list of original data filters passed down from querying engine
    * @return list of pruned (data-skipped) candidate base-files' names
    */
-  private def lookupCandidateFilesInColStatsIndex(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try {
-    val indexPath = metaClient.getColumnStatsIndexPath
+  private def lookupCandidateFilesInMetadataTable(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try {
     val fs = metaClient.getFs
+    val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(basePath)

Review comment:
       Is the plan to deprecate existing column stats under `.hoodie/.colstatsindex` and remove all usage of it in 0.11?  If not, should we have two modes where metadata col stats index is used when metadata table is enabled, and `.hoodie/.colstatsindex` is used if metadata table is disabled?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/TablePathUtils.java
##########
@@ -92,12 +92,21 @@ private static boolean isInsideTableMetadataFolder(String path) {
         HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, partitionPath);
         metadata.readFromFS();
         return Option.of(getNthParent(partitionPath, metadata.getPartitionDepth()));
+      } else {
+        // Simply traverse directory structure until found .hoodie folder
+        Path current = partitionPath;
+        while (current != null) {
+          if (hasTableMetadataFolder(fs, current)) {

Review comment:
       One caveat is that this may incur more than one `fs.exists()` calls.  Is this only used for initialization (which is fine), e.g., getting table path from config, and not for core read/write logic per data file?

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -194,77 +192,102 @@ case class HoodieFileIndex(spark: SparkSession,
    * @param queryFilters list of original data filters passed down from querying engine
    * @return list of pruned (data-skipped) candidate base-files' names
    */
-  private def lookupCandidateFilesInColStatsIndex(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try {
-    val indexPath = metaClient.getColumnStatsIndexPath
+  private def lookupCandidateFilesInMetadataTable(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try {
     val fs = metaClient.getFs
+    val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(basePath)
 
-    if (!enableDataSkipping() || !fs.exists(new Path(indexPath)) || queryFilters.isEmpty) {
-      // scalastyle:off return
-      return Success(Option.empty)
-      // scalastyle:on return
-    }
-
-    val completedCommits = getActiveTimeline.filterCompletedInstants().getInstants.iterator.asScala.toList.map(_.getTimestamp)
-
-    // Collect all index tables present in `.zindex` folder
-    val candidateIndexTables =
-      fs.listStatus(new Path(indexPath))
-        .filter(_.isDirectory)
-        .map(_.getPath.getName)
-        .filter(completedCommits.contains(_))
-        .sortBy(x => x)
-
-    if (candidateIndexTables.isEmpty) {
-      // scalastyle:off return
-      return Success(Option.empty)
-      // scalastyle:on return
-    }
-
-    val dataFrameOpt = try {
-      Some(spark.read.load(new Path(indexPath, candidateIndexTables.last).toString))
-    } catch {
-      case t: Throwable =>
-        logError("Failed to read col-stats index; skipping", t)
-        None
+    if (!isDataSkippingEnabled() || !fs.exists(new Path(metadataTablePath)) || queryFilters.isEmpty) {
+      Option.empty
+    } else {
+      val targetColStatsIndexColumns = Seq(
+        HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
+        HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
+        HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE,
+        HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
+
+      val requiredMetadataIndexColumns =
+        (targetColStatsIndexColumns :+ HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).map(colName =>
+          s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}")
+
+      // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
+      val metadataTableDF = spark.read.format("org.apache.hudi")
+        .load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}")
+
+      // TODO filter on (column, partition) prefix
+      val colStatsDF = metadataTableDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull)
+        .select(requiredMetadataIndexColumns.map(col): _*)
+
+      val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema)
+
+      // Persist DF to avoid re-computing column statistics unraveling
+      withPersistence(colStatsDF) {
+        // Metadata Table bears rows in the following format
+        //
+        //  +---------------------------+------------+------------+------------+-------------+
+        //  |        fileName           | columnName |  minValue  |  maxValue  |  num_nulls  |
+        //  +---------------------------+------------+------------+------------+-------------+
+        //  | one_base_file.parquet     |          A |          1 |         10 |           0 |
+        //  | another_base_file.parquet |          A |        -10 |          0 |           5 |
+        //  +---------------------------+------------+------------+------------+-------------+
+        //
+        // While Data Skipping utils are expecting following (transposed) format, where per-column stats are
+        // essentially transposed (from rows to columns):
+        //
+        //  +---------------------------+------------+------------+-------------+
+        //  |          file             | A_minValue | A_maxValue | A_num_nulls |
+        //  +---------------------------+------------+------------+-------------+
+        //  | one_base_file.parquet     |          1 |         10 |           0 |
+        //  | another_base_file.parquet |        -10 |          0 |           5 |
+        //  +---------------------------+------------+------------+-------------+
+        //
+        // NOTE: Column Stats Index might potentially contain statistics for many columns (if not all), while
+        //       query at hand might only be referencing a handful of those. As such, we collect all the
+        //       column references from the filtering expressions, and only transpose records corresponding to the
+        //       columns referenced in those
+        val transposedColStatsDF =
+        queryReferencedColumns.map(colName =>
+          colStatsDF.filter(col(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).equalTo(colName))
+            .select(targetColStatsIndexColumns.map(col): _*)
+            .withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT, getNumNullsColumnNameFor(colName))
+            .withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE, getMinColumnNameFor(colName))
+            .withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE, getMaxColumnNameFor(colName))
+        )
+          .reduceLeft((left, right) =>
+            left.join(right, usingColumn = HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME))

Review comment:
       this may not scale well with a large number of columns from predicates, as DF joining is expensive, even considering caching.  I'm wondering if a different DAG should be written for metadata table col stats, i.e., one row of col stats per file + column.  Conceptually, I think such joining can be avoided when prunning the files.

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -194,77 +192,102 @@ case class HoodieFileIndex(spark: SparkSession,
    * @param queryFilters list of original data filters passed down from querying engine
    * @return list of pruned (data-skipped) candidate base-files' names
    */
-  private def lookupCandidateFilesInColStatsIndex(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try {
-    val indexPath = metaClient.getColumnStatsIndexPath
+  private def lookupCandidateFilesInMetadataTable(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try {
     val fs = metaClient.getFs
+    val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(basePath)

Review comment:
       I also need clarification on how `.hoodie/.colstatsindex` is generated.  Does that come from clustering or it is also updated per write?

##########
File path: hudi-common/src/main/avro/HoodieMetadata.avsc
##########
@@ -109,6 +109,14 @@
                                 "string"
                             ]
                         },
+                        {
+                            "doc": "Column name for which this column statistics applies",

Review comment:
       Adding to @codope 's point, does this break the read/write of metadata records in the existing metadata table if users enables it in older releases, e.g., 0.10.0 and 0.10.1?

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -194,77 +192,102 @@ case class HoodieFileIndex(spark: SparkSession,
    * @param queryFilters list of original data filters passed down from querying engine
    * @return list of pruned (data-skipped) candidate base-files' names
    */
-  private def lookupCandidateFilesInColStatsIndex(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try {
-    val indexPath = metaClient.getColumnStatsIndexPath
+  private def lookupCandidateFilesInMetadataTable(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try {
     val fs = metaClient.getFs
+    val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(basePath)
 
-    if (!enableDataSkipping() || !fs.exists(new Path(indexPath)) || queryFilters.isEmpty) {
-      // scalastyle:off return
-      return Success(Option.empty)
-      // scalastyle:on return
-    }
-
-    val completedCommits = getActiveTimeline.filterCompletedInstants().getInstants.iterator.asScala.toList.map(_.getTimestamp)
-
-    // Collect all index tables present in `.zindex` folder
-    val candidateIndexTables =
-      fs.listStatus(new Path(indexPath))
-        .filter(_.isDirectory)
-        .map(_.getPath.getName)
-        .filter(completedCommits.contains(_))
-        .sortBy(x => x)
-
-    if (candidateIndexTables.isEmpty) {
-      // scalastyle:off return
-      return Success(Option.empty)
-      // scalastyle:on return
-    }
-
-    val dataFrameOpt = try {
-      Some(spark.read.load(new Path(indexPath, candidateIndexTables.last).toString))
-    } catch {
-      case t: Throwable =>
-        logError("Failed to read col-stats index; skipping", t)
-        None
+    if (!isDataSkippingEnabled() || !fs.exists(new Path(metadataTablePath)) || queryFilters.isEmpty) {
+      Option.empty
+    } else {
+      val targetColStatsIndexColumns = Seq(
+        HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
+        HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
+        HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE,
+        HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
+
+      val requiredMetadataIndexColumns =
+        (targetColStatsIndexColumns :+ HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).map(colName =>
+          s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}")
+
+      // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
+      val metadataTableDF = spark.read.format("org.apache.hudi")
+        .load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}")
+
+      // TODO filter on (column, partition) prefix
+      val colStatsDF = metadataTableDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull)
+        .select(requiredMetadataIndexColumns.map(col): _*)

Review comment:
       Should the logic of fetching column stats into DF be incorporated into `BaseTableMetadata` as there is already another API of `getColumnStats()`?  In this way, it may also be possible to make the logic here metadata table agnostic, and instead rely on BaseTableMetadata/HoodieTableMetadata to decide which source (.hoodie/.colstatsindex on fs vs metadata table) to use.

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -194,77 +192,102 @@ case class HoodieFileIndex(spark: SparkSession,
    * @param queryFilters list of original data filters passed down from querying engine
    * @return list of pruned (data-skipped) candidate base-files' names
    */
-  private def lookupCandidateFilesInColStatsIndex(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try {
-    val indexPath = metaClient.getColumnStatsIndexPath
+  private def lookupCandidateFilesInMetadataTable(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try {
     val fs = metaClient.getFs
+    val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(basePath)
 
-    if (!enableDataSkipping() || !fs.exists(new Path(indexPath)) || queryFilters.isEmpty) {
-      // scalastyle:off return
-      return Success(Option.empty)
-      // scalastyle:on return
-    }
-
-    val completedCommits = getActiveTimeline.filterCompletedInstants().getInstants.iterator.asScala.toList.map(_.getTimestamp)
-
-    // Collect all index tables present in `.zindex` folder
-    val candidateIndexTables =
-      fs.listStatus(new Path(indexPath))
-        .filter(_.isDirectory)
-        .map(_.getPath.getName)
-        .filter(completedCommits.contains(_))
-        .sortBy(x => x)
-
-    if (candidateIndexTables.isEmpty) {
-      // scalastyle:off return
-      return Success(Option.empty)
-      // scalastyle:on return
-    }
-
-    val dataFrameOpt = try {
-      Some(spark.read.load(new Path(indexPath, candidateIndexTables.last).toString))
-    } catch {
-      case t: Throwable =>
-        logError("Failed to read col-stats index; skipping", t)
-        None
+    if (!isDataSkippingEnabled() || !fs.exists(new Path(metadataTablePath)) || queryFilters.isEmpty) {
+      Option.empty
+    } else {
+      val targetColStatsIndexColumns = Seq(
+        HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
+        HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
+        HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE,
+        HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
+
+      val requiredMetadataIndexColumns =
+        (targetColStatsIndexColumns :+ HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).map(colName =>
+          s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}")
+
+      // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
+      val metadataTableDF = spark.read.format("org.apache.hudi")
+        .load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}")
+
+      // TODO filter on (column, partition) prefix
+      val colStatsDF = metadataTableDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull)
+        .select(requiredMetadataIndexColumns.map(col): _*)
+
+      val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema)
+
+      // Persist DF to avoid re-computing column statistics unraveling
+      withPersistence(colStatsDF) {
+        // Metadata Table bears rows in the following format
+        //
+        //  +---------------------------+------------+------------+------------+-------------+
+        //  |        fileName           | columnName |  minValue  |  maxValue  |  num_nulls  |
+        //  +---------------------------+------------+------------+------------+-------------+
+        //  | one_base_file.parquet     |          A |          1 |         10 |           0 |
+        //  | another_base_file.parquet |          A |        -10 |          0 |           5 |
+        //  +---------------------------+------------+------------+------------+-------------+
+        //
+        // While Data Skipping utils are expecting following (transposed) format, where per-column stats are
+        // essentially transposed (from rows to columns):
+        //
+        //  +---------------------------+------------+------------+-------------+
+        //  |          file             | A_minValue | A_maxValue | A_num_nulls |
+        //  +---------------------------+------------+------------+-------------+
+        //  | one_base_file.parquet     |          1 |         10 |           0 |
+        //  | another_base_file.parquet |        -10 |          0 |           5 |
+        //  +---------------------------+------------+------------+-------------+
+        //
+        // NOTE: Column Stats Index might potentially contain statistics for many columns (if not all), while
+        //       query at hand might only be referencing a handful of those. As such, we collect all the
+        //       column references from the filtering expressions, and only transpose records corresponding to the
+        //       columns referenced in those
+        val transposedColStatsDF =

Review comment:
       I guess the purpose of doing transposing here is to adapt to the expected input of existing APIs of data skipping?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org