You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/04/11 07:02:45 UTC

[GitHub] [hudi] codope commented on a diff in pull request #5275: [HUDI-3841] Fixing Column Stats in the presence of Schema Evolution

codope commented on code in PR #5275:
URL: https://github.com/apache/hudi/pull/5275#discussion_r846988154


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala:
##########
@@ -113,59 +114,88 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport {
    *
    * @param spark Spark session ref
    * @param colStatsDF [[DataFrame]] bearing raw Column Stats Index table
-   * @param targetColumns target columns to be included into the final table
+   * @param queryColumns target columns to be included into the final table
    * @param tableSchema schema of the source data table
    * @return reshaped table according to the format outlined above
    */
-  def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame, targetColumns: Seq[String], tableSchema: StructType): DataFrame = {
+  def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame, queryColumns: Seq[String], tableSchema: StructType): DataFrame = {
     val colStatsSchema = colStatsDF.schema
     val colStatsSchemaOrdinalsMap = colStatsSchema.fields.zipWithIndex.map({
       case (field, ordinal) => (field.name, ordinal)
     }).toMap
 
     val tableSchemaFieldMap = tableSchema.fields.map(f => (f.name, f)).toMap
 
-    // NOTE: We're sorting the columns to make sure final index schema matches layout
-    //       of the transposed table
-    val sortedColumns = TreeSet(targetColumns: _*)
-
     val colNameOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME)
     val minValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE)
     val maxValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE)
     val fileNameOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
     val nullCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
     val valueCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT)
 
-    val transposedRDD = colStatsDF.rdd
-      .filter(row => sortedColumns.contains(row.getString(colNameOrdinal)))
-      .map { row =>
-        val (minValue, _) = tryUnpackNonNullVal(row.getAs[Row](minValueOrdinal))
-        val (maxValue, _) = tryUnpackNonNullVal(row.getAs[Row](maxValueOrdinal))
-
-        val colName = row.getString(colNameOrdinal)
-        val colType = tableSchemaFieldMap(colName).dataType
+    // NOTE: We have to collect list of indexed columns to make sure we properly align the rows
+    //       w/in the transposed dataset: since some files might not have all of the columns indexed
+    //       either due to the Column Stats Index config changes, schema evolution, etc, we have
+    //       to make sure that all of the rows w/in transposed data-frame are properly padded (with null
+    //       values) for such file-column combinations
+    val indexedColumns: Seq[String] = colStatsDF.rdd.map(row => row.getString(colNameOrdinal)).distinct().collect()

Review Comment:
   Why not do this one level above in `readColumnStatsIndex` so that `colStatsDF` itself is correctly populated and `transposeColumnStatsIndex` simply transposes as today?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org