You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/07/24 01:29:02 UTC

[GitHub] [hudi] XuQianJin-Stars commented on a diff in pull request #5746: [HUDI-4250][HUDI-4202] Optimize performance of Column Stats Index reading in Data Skipping

XuQianJin-Stars commented on code in PR #5746:
URL: https://github.com/apache/hudi/pull/5746#discussion_r928182362


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala:
##########
@@ -112,154 +195,186 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport {
    *       column references from the filtering expressions, and only transpose records corresponding to the
    *       columns referenced in those
    *
-   * @param spark Spark session ref
-   * @param colStatsDF [[DataFrame]] bearing raw Column Stats Index table
+   * @param colStatsRecords [[HoodieData[HoodieMetadataColumnStats]]] bearing raw Column Stats Index records
    * @param queryColumns target columns to be included into the final table
-   * @param tableSchema schema of the source data table
    * @return reshaped table according to the format outlined above
    */
-  def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame, queryColumns: Seq[String], tableSchema: StructType): DataFrame = {
-    val colStatsSchema = colStatsDF.schema
-    val colStatsSchemaOrdinalsMap = colStatsSchema.fields.zipWithIndex.map({
-      case (field, ordinal) => (field.name, ordinal)
-    }).toMap
-
+  private def transpose(colStatsRecords: HoodieData[HoodieMetadataColumnStats], queryColumns: Seq[String]): (HoodieData[Row], StructType) = {
     val tableSchemaFieldMap = tableSchema.fields.map(f => (f.name, f)).toMap
-
-    val colNameOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME)
-    val minValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE)
-    val maxValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE)
-    val fileNameOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
-    val nullCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
-    val valueCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT)
-
-    // NOTE: We have to collect list of indexed columns to make sure we properly align the rows
-    //       w/in the transposed dataset: since some files might not have all of the columns indexed
-    //       either due to the Column Stats Index config changes, schema evolution, etc, we have
-    //       to make sure that all of the rows w/in transposed data-frame are properly padded (with null
-    //       values) for such file-column combinations
-    val indexedColumns: Seq[String] = colStatsDF.rdd.map(row => row.getString(colNameOrdinal)).distinct().collect()
-
     // NOTE: We're sorting the columns to make sure final index schema matches layout
     //       of the transposed table
-    val sortedTargetColumns = TreeSet(queryColumns.intersect(indexedColumns): _*)
-
-    val transposedRDD = colStatsDF.rdd
-      .filter(row => sortedTargetColumns.contains(row.getString(colNameOrdinal)))
-      .map { row =>
-        if (row.isNullAt(minValueOrdinal) && row.isNullAt(maxValueOrdinal)) {
+    val sortedTargetColumnsSet = TreeSet(queryColumns:_*)
+    val sortedTargetColumns = sortedTargetColumnsSet.toSeq
+
+    // NOTE: This is a trick to avoid pulling all of [[ColumnStatsIndexSupport]] object into the lambdas'
+    //       closures below
+    val indexedColumns = this.indexedColumns
+
+    // Here we perform complex transformation which requires us to modify the layout of the rows
+    // of the dataset, and therefore we rely on low-level RDD API to avoid incurring encoding/decoding
+    // penalty of the [[Dataset]], since it's required to adhere to its schema at all times, while
+    // RDDs are not;
+    val transposedRows: HoodieData[Row] = colStatsRecords
+      // NOTE: Explicit conversion is required for Scala 2.11
+      .filter(JFunction.toJavaSerializableFunction(r => sortedTargetColumnsSet.contains(r.getColumnName)))
+      .mapToPair(JFunction.toJavaSerializablePairFunction(r => {
+        if (r.getMinValue == null && r.getMaxValue == null) {
           // Corresponding row could be null in either of the 2 cases
           //    - Column contains only null values (in that case both min/max have to be nulls)
           //    - This is a stubbed Column Stats record (used as a tombstone)
-          row
+          collection.Pair.of(r.getFileName, r)
         } else {
-          val minValueStruct = row.getAs[Row](minValueOrdinal)
-          val maxValueStruct = row.getAs[Row](maxValueOrdinal)
+          val minValueWrapper = r.getMinValue
+          val maxValueWrapper = r.getMaxValue
 
-          checkState(minValueStruct != null && maxValueStruct != null, "Invalid Column Stats record: either both min/max have to be null, or both have to be non-null")
+          checkState(minValueWrapper != null && maxValueWrapper != null, "Invalid Column Stats record: either both min/max have to be null, or both have to be non-null")
 
-          val colName = row.getString(colNameOrdinal)
+          val colName = r.getColumnName
           val colType = tableSchemaFieldMap(colName).dataType
 
-          val (minValue, _) = tryUnpackNonNullVal(minValueStruct)
-          val (maxValue, _) = tryUnpackNonNullVal(maxValueStruct)
-          val rowValsSeq = row.toSeq.toArray
+          val minValue = deserialize(tryUnpackValueWrapper(minValueWrapper), colType)
+          val maxValue = deserialize(tryUnpackValueWrapper(maxValueWrapper), colType)
+
           // Update min-/max-value structs w/ unwrapped values in-place
-          rowValsSeq(minValueOrdinal) = deserialize(minValue, colType)
-          rowValsSeq(maxValueOrdinal) = deserialize(maxValue, colType)
+          r.setMinValue(minValue)
+          r.setMaxValue(maxValue)
 
-          Row(rowValsSeq: _*)
+          collection.Pair.of(r.getFileName, r)
         }
-      }
-      .groupBy(r => r.getString(fileNameOrdinal))
-      .foldByKey(Seq[Row]()) {
-        case (_, columnRowsSeq) =>
-          // Rows seq is always non-empty (otherwise it won't be grouped into)
-          val fileName = columnRowsSeq.head.get(fileNameOrdinal)
-          val valueCount = columnRowsSeq.head.get(valueCountOrdinal)
-
-          // To properly align individual rows (corresponding to a file) w/in the transposed projection, we need
-          // to align existing column-stats for individual file with the list of expected ones for the
-          // whole transposed projection (a superset of all files)
-          val columnRowsMap = columnRowsSeq.map(row => (row.getString(colNameOrdinal), row)).toMap
-          val alignedColumnRowsSeq = sortedTargetColumns.toSeq.map(columnRowsMap.get)
-
-          val coalescedRowValuesSeq =
-            alignedColumnRowsSeq.foldLeft(Seq[Any](fileName, valueCount)) {
-              case (acc, opt) =>
-                opt match {
-                  case Some(columnStatsRow) =>
-                    acc ++ Seq(minValueOrdinal, maxValueOrdinal, nullCountOrdinal).map(ord => columnStatsRow.get(ord))
-                  case None =>
-                    // NOTE: Since we're assuming missing column to essentially contain exclusively
-                    //       null values, we set null-count to be equal to value-count (this behavior is
-                    //       consistent with reading non-existent columns from Parquet)
-                    acc ++ Seq(null, null, valueCount)
-                }
-            }
-
-          Seq(Row(coalescedRowValuesSeq:_*))
-      }
-      .values
-      .flatMap(it => it)
+      }))
+      .groupByKey()
+      .map(JFunction.toJavaSerializableFunction(p => {
+        val columnRecordsSeq: Seq[HoodieMetadataColumnStats] = p.getValue.asScala.toSeq
+        val fileName: String = p.getKey
+        val valueCount: Long = columnRecordsSeq.head.getValueCount
+
+        // To properly align individual rows (corresponding to a file) w/in the transposed projection, we need
+        // to align existing column-stats for individual file with the list of expected ones for the
+        // whole transposed projection (a superset of all files)
+        val columnRecordsMap = columnRecordsSeq.map(r => (r.getColumnName, r)).toMap
+        val alignedColStatRecordsSeq = sortedTargetColumns.map(columnRecordsMap.get)
+
+        val coalescedRowValuesSeq =
+          alignedColStatRecordsSeq.foldLeft(ListBuffer[Any](fileName, valueCount)) {
+            case (acc, opt) =>
+              opt match {
+                case Some(colStatRecord) =>
+                  acc ++= Seq(colStatRecord.getMinValue, colStatRecord.getMaxValue, colStatRecord.getNullCount)
+                case None =>
+                  // NOTE: This could occur in either of the following cases:
+                  //    1. Column is not indexed in Column Stats Index: in this case we won't be returning
+                  //       any statistics for such column (ie all stats will be null)
+                  //    2. Particular file does not have this particular column (which is indexed by Column Stats Index):
+                  //       in this case we're assuming missing column to essentially contain exclusively
+                  //       null values, we set min/max values as null and null-count to be equal to value-count (this
+                  //       behavior is consistent with reading non-existent columns from Parquet)
+                  //
+                  // This is a way to determine current column's index without explicit iteration (we're adding 3 stats / column)
+                  val idx = acc.length / 3
+                  val colName = sortedTargetColumns(idx)
+                  val indexed = indexedColumns.contains(colName)
+
+                  val nullCount = if (indexed) valueCount else null
+
+                  acc ++= Seq(null, null, nullCount)
+              }
+          }
+
+        Row(coalescedRowValuesSeq:_*)
+      }))
 
     // NOTE: It's crucial to maintain appropriate ordering of the columns
     //       matching table layout: hence, we cherry-pick individual columns
     //       instead of simply filtering in the ones we're interested in the schema
-    val indexSchema = composeIndexSchema(sortedTargetColumns.toSeq, tableSchema)
+    val indexSchema = composeIndexSchema(sortedTargetColumns, tableSchema)
+    (transposedRows, indexSchema)
+  }
 
-    spark.createDataFrame(transposedRDD, indexSchema)
+  private def loadColumnStatsIndexForColumnsInternal(targetColumns: Seq[String], shouldReadInMemory: Boolean): DataFrame = {
+    val colStatsDF = {
+      val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory)
+      // NOTE: Explicit conversion is required for Scala 2.11
+      val catalystRows: HoodieData[InternalRow] = colStatsRecords.mapPartitions(JFunction.toJavaSerializableFunction(it => {
+        val converter = AvroConversionUtils.createAvroToInternalRowConverter(HoodieMetadataColumnStats.SCHEMA$, columnStatsRecordStructType)
+        it.asScala.map(r => converter(r).orNull).asJava
+      }), false)
+
+      if (shouldReadInMemory) {
+        // NOTE: This will instantiate a [[Dataset]] backed by [[LocalRelation]] holding all of the rows
+        //       of the transposed table in memory, facilitating execution of the subsequently chained operations
+        //       on it locally (on the driver; all such operations are actually going to be performed by Spark's
+        //       Optimizer)
+        createDataFrameFromInternalRows(spark, catalystRows.collectAsList().asScala, columnStatsRecordStructType)
+      } else {
+        createDataFrameFromRDD(spark, HoodieJavaRDD.getJavaRDD(catalystRows), columnStatsRecordStructType)
+      }
+    }
+
+    colStatsDF.select(targetColumnStatsIndexColumns.map(col): _*)
+  }
+
+  private def loadColumnStatsIndexRecords(targetColumns: Seq[String], shouldReadInMemory: Boolean): HoodieData[HoodieMetadataColumnStats] = {
+    // Read Metadata Table's Column Stats Index records into [[HoodieData]] container by
+    //    - Fetching the records from CSI by key-prefixes (encoded column names)
+    //    - Extracting [[HoodieMetadataColumnStats]] records
+    //    - Filtering out nulls
+    checkState(targetColumns.nonEmpty)
+
+    // TODO encoding should be done internally w/in HoodieBackedTableMetadata
+    val encodedTargetColumnNames = targetColumns.map(colName => new ColumnIndexID(colName).asBase64EncodedString())
+
+    val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] =
+      metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames.asJava, HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, shouldReadInMemory)
+
+    val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
+      // NOTE: Explicit conversion is required for Scala 2.11
+      metadataRecords.map(JFunction.toJavaSerializableFunction(record => {
+        toScalaOption(record.getData.getInsertValue(null, null))
+          .map(metadataRecord => metadataRecord.asInstanceOf[HoodieMetadataRecord].getColumnStatsMetadata)
+          .orNull
+      }))
+        .filter(JFunction.toJavaSerializableFunction(columnStatsRecord => columnStatsRecord != null))
+
+    columnStatsRecords
   }
 
-  private def readFullColumnStatsIndexInternal(spark: SparkSession, metadataConfig: HoodieMetadataConfig, tableBasePath: String): DataFrame = {
-    val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(tableBasePath)
+  private def loadFullColumnStatsIndexInternal(): DataFrame = {
+    val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePathV2.toString)
     // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
-    spark.read.format("org.apache.hudi")
+    val colStatsDF = spark.read.format("org.apache.hudi")
       .options(metadataConfig.getProps.asScala)
       .load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}")
-  }
-
-  private def readColumnStatsIndexForColumnsInternal(spark: SparkSession, targetColumns: Seq[String], metadataConfig: HoodieMetadataConfig, tableBasePath: String) = {
-    val ctx = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
 
-    // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]] by
-    //    - Fetching the records from CSI by key-prefixes (encoded column names)
-    //    - Deserializing fetched records into [[InternalRow]]s
-    //    - Composing [[DataFrame]]
-    val metadataTableDF = {
-      val metadataTable = HoodieTableMetadata.create(ctx, metadataConfig, tableBasePath, FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue)
-
-      // TODO encoding should be done internally w/in HoodieBackedTableMetadata
-      val encodedTargetColumnNames = targetColumns.map(colName => new ColumnIndexID(colName).asBase64EncodedString())
-
-      val recordsRDD: RDD[HoodieRecord[HoodieMetadataPayload]] =
-        HoodieJavaRDD.getJavaRDD(
-          metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames.asJava, HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
-        )
-
-      val catalystRowsRDD: RDD[InternalRow] = recordsRDD.mapPartitions { it =>
-        val metadataRecordSchema = new Parser().parse(metadataRecordSchemaString)
-        val converter = AvroConversionUtils.createAvroToInternalRowConverter(metadataRecordSchema, metadataRecordStructType)
-
-        it.map { record =>
-          // schema and props are ignored for generating metadata record from the payload
-          // instead, the underlying file system, or bloom filter, or columns stats metadata (part of payload) are directly used
-          toScalaOption(record.getData.getInsertValue(null, null))
-            .flatMap(avroRecord => converter(avroRecord.asInstanceOf[GenericRecord]))
-            .orNull
-        }
-      }
+    val requiredIndexColumns =
+      targetColumnStatsIndexColumns.map(colName =>
+        col(s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}"))
 
-      HoodieUnsafeRDDUtils.createDataFrame(spark, catalystRowsRDD, metadataRecordStructType)
-    }
-    metadataTableDF
+    colStatsDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull)
+      .select(requiredIndexColumns: _*)
   }
 }
 
 object ColumnStatsIndexSupport {
 
-  private val metadataRecordSchemaString: String = HoodieMetadataRecord.SCHEMA$.toString
-  private val metadataRecordStructType: StructType = AvroConversionUtils.convertAvroSchemaToStructType(HoodieMetadataRecord.SCHEMA$)
+  private val columnStatsIndexProjectionSizeInMemoryThreshold = 100000

Review Comment:
   > could you explain a little why set `columnStatsIndexProjectionSizeInMemoryThreshold` to 100,000?
   
   Is it possible to configure by parameter default value?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org