You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/04/11 15:48:48 UTC

[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5275: [HUDI-3841] Fixing Column Stats in the presence of Schema Evolution

alexeykudinkin commented on code in PR #5275:
URL: https://github.com/apache/hudi/pull/5275#discussion_r847484362


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala:
##########
@@ -113,59 +114,88 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport {
    *
    * @param spark Spark session ref
    * @param colStatsDF [[DataFrame]] bearing raw Column Stats Index table
-   * @param targetColumns target columns to be included into the final table
+   * @param queryColumns target columns to be included into the final table
    * @param tableSchema schema of the source data table
    * @return reshaped table according to the format outlined above
    */
-  def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame, targetColumns: Seq[String], tableSchema: StructType): DataFrame = {
+  def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame, queryColumns: Seq[String], tableSchema: StructType): DataFrame = {
     val colStatsSchema = colStatsDF.schema
     val colStatsSchemaOrdinalsMap = colStatsSchema.fields.zipWithIndex.map({
       case (field, ordinal) => (field.name, ordinal)
     }).toMap
 
     val tableSchemaFieldMap = tableSchema.fields.map(f => (f.name, f)).toMap
 
-    // NOTE: We're sorting the columns to make sure final index schema matches layout
-    //       of the transposed table
-    val sortedColumns = TreeSet(targetColumns: _*)
-
     val colNameOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME)
     val minValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE)
     val maxValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE)
     val fileNameOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
     val nullCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
     val valueCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT)
 
-    val transposedRDD = colStatsDF.rdd
-      .filter(row => sortedColumns.contains(row.getString(colNameOrdinal)))
-      .map { row =>
-        val (minValue, _) = tryUnpackNonNullVal(row.getAs[Row](minValueOrdinal))
-        val (maxValue, _) = tryUnpackNonNullVal(row.getAs[Row](maxValueOrdinal))
-
-        val colName = row.getString(colNameOrdinal)
-        val colType = tableSchemaFieldMap(colName).dataType
+    // NOTE: We have to collect list of indexed columns to make sure we properly align the rows
+    //       w/in the transposed dataset: since some files might not have all of the columns indexed
+    //       either due to the Column Stats Index config changes, schema evolution, etc, we have
+    //       to make sure that all of the rows w/in transposed data-frame are properly padded (with null
+    //       values) for such file-column combinations
+    val indexedColumns: Seq[String] = colStatsDF.rdd.map(row => row.getString(colNameOrdinal)).distinct().collect()

Review Comment:
   `colStatsDF` is populated correctly -- it bears 1 row / column (let's call it "row-based"), therefore for all columns in a file we will have N rows corresponding to it (eq to the # of columns in that file).
   
   Transposed table is "column-based", ie there's 1 row / file and each column's stat is mapped to a column in such view. Therefore only in that view we have a need to align the rows (to pad them).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org