You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by ak...@apache.org on 2023/02/17 17:12:06 UTC

[hudi] branch master updated: [HUDI-5557]Avoid converting columns that are not indexed in CSI (#7672)

This is an automated email from the ASF dual-hosted git repository.

akudinkin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 96032b9b768 [HUDI-5557]Avoid converting columns that are not indexed in CSI (#7672)
96032b9b768 is described below

commit 96032b9b768cc0d4c23eb6a6afd5ce019d56826b
Author: rfyu <39...@users.noreply.github.com>
AuthorDate: Sat Feb 18 01:11:57 2023 +0800

    [HUDI-5557]Avoid converting columns that are not indexed in CSI (#7672)
    
    Avoid converting columns that are not indexed in CSI
    
    If we directly set the non-indexed columns statistics to null instead of filtering out these columns,the result of computing pruned list of candidate base-files according to Metadata Table's Column Statistics index may be wrong.
    
    ---------
    Co-authored-by: yrf余若凡 <rf...@trip.com>
---
 .../org/apache/hudi/ColumnStatsIndexSupport.scala  |  35 +++----
 .../org/apache/hudi/ColumnStatsIndexHelper.java    |   7 +-
 .../org/apache/hudi/TestDataSkippingUtils.scala    |   2 +-
 .../hudi/functional/TestColumnStatsIndex.scala     | 115 ++++++++++++++++++++-
 4 files changed, 131 insertions(+), 28 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index 5cf7a5ec035..68f7a9f16c1 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -209,12 +209,16 @@ class ColumnStatsIndexSupport(spark: SparkSession,
     // NOTE: We're sorting the columns to make sure final index schema matches layout
     //       of the transposed table
     val sortedTargetColumnsSet = TreeSet(queryColumns:_*)
-    val sortedTargetColumns = sortedTargetColumnsSet.toSeq
 
     // NOTE: This is a trick to avoid pulling all of [[ColumnStatsIndexSupport]] object into the lambdas'
     //       closures below
     val indexedColumns = this.indexedColumns
 
+    // NOTE: It's crucial to maintain appropriate ordering of the columns
+    //       matching table layout: hence, we cherry-pick individual columns
+    //       instead of simply filtering in the ones we're interested in the schema
+    val (indexSchema, targetIndexedColumns) = composeIndexSchema(sortedTargetColumnsSet.toSeq, indexedColumns, tableSchema)
+
     // Here we perform complex transformation which requires us to modify the layout of the rows
     // of the dataset, and therefore we rely on low-level RDD API to avoid incurring encoding/decoding
     // penalty of the [[Dataset]], since it's required to adhere to its schema at all times, while
@@ -257,7 +261,7 @@ class ColumnStatsIndexSupport(spark: SparkSession,
         // to align existing column-stats for individual file with the list of expected ones for the
         // whole transposed projection (a superset of all files)
         val columnRecordsMap = columnRecordsSeq.map(r => (r.getColumnName, r)).toMap
-        val alignedColStatRecordsSeq = sortedTargetColumns.map(columnRecordsMap.get)
+        val alignedColStatRecordsSeq = targetIndexedColumns.map(columnRecordsMap.get)
 
         val coalescedRowValuesSeq =
           alignedColStatRecordsSeq.foldLeft(ListBuffer[Any](fileName, valueCount)) {
@@ -267,31 +271,19 @@ class ColumnStatsIndexSupport(spark: SparkSession,
                   acc ++= Seq(colStatRecord.getMinValue, colStatRecord.getMaxValue, colStatRecord.getNullCount)
                 case None =>
                   // NOTE: This could occur in either of the following cases:
-                  //    1. Column is not indexed in Column Stats Index: in this case we won't be returning
-                  //       any statistics for such column (ie all stats will be null)
-                  //    2. Particular file does not have this particular column (which is indexed by Column Stats Index):
+                  //    1. Particular file does not have this particular column (which is indexed by Column Stats Index):
                   //       in this case we're assuming missing column to essentially contain exclusively
                   //       null values, we set min/max values as null and null-count to be equal to value-count (this
                   //       behavior is consistent with reading non-existent columns from Parquet)
                   //
                   // This is a way to determine current column's index without explicit iteration (we're adding 3 stats / column)
-                  val idx = acc.length / 3
-                  val colName = sortedTargetColumns(idx)
-                  val indexed = indexedColumns.contains(colName)
-
-                  val nullCount = if (indexed) valueCount else null
-
-                  acc ++= Seq(null, null, nullCount)
+                  acc ++= Seq(null, null, valueCount)
               }
           }
 
         Row(coalescedRowValuesSeq:_*)
       }))
 
-    // NOTE: It's crucial to maintain appropriate ordering of the columns
-    //       matching table layout: hence, we cherry-pick individual columns
-    //       instead of simply filtering in the ones we're interested in the schema
-    val indexSchema = composeIndexSchema(sortedTargetColumns, tableSchema)
     (transposedRows, indexSchema)
   }
 
@@ -382,21 +374,22 @@ object ColumnStatsIndexSupport {
   /**
    * @VisibleForTesting
    */
-  def composeIndexSchema(targetColumnNames: Seq[String], tableSchema: StructType): StructType = {
+  def composeIndexSchema(targetColumnNames: Seq[String], indexedColumns: Set[String], tableSchema: StructType): (StructType, Seq[String]) = {
     val fileNameField = StructField(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME, StringType, nullable = true, Metadata.empty)
     val valueCountField = StructField(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT, LongType, nullable = true, Metadata.empty)
 
-    val targetFields = targetColumnNames.map(colName => tableSchema.fields.find(f => f.name == colName).get)
+    val targetIndexedColumns = targetColumnNames.filter(indexedColumns.contains(_))
+    val targetIndexedFields = targetIndexedColumns.map(colName => tableSchema.fields.find(f => f.name == colName).get)
 
-    StructType(
-      targetFields.foldLeft(Seq(fileNameField, valueCountField)) {
+    (StructType(
+      targetIndexedFields.foldLeft(Seq(fileNameField, valueCountField)) {
         case (acc, field) =>
           acc ++ Seq(
             composeColumnStatStructType(field.name, HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE, field.dataType),
             composeColumnStatStructType(field.name, HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE, field.dataType),
             composeColumnStatStructType(field.name, HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT, LongType))
       }
-    )
+    ), targetIndexedColumns)
   }
 
   @inline def getMinColumnNameFor(colName: String): String =
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
index 7c9649d4499..7ba82931fb6 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
@@ -237,9 +237,10 @@ public class ColumnStatsIndexHelper {
             .filter(Objects::nonNull);
 
     StructType indexSchema = ColumnStatsIndexSupport$.MODULE$.composeIndexSchema(
-        JavaConverters$.MODULE$.collectionAsScalaIterableConverter(columnNames).asScala().toSeq(),
-        StructType$.MODULE$.apply(orderedColumnSchemas)
-    );
+          JavaConverters$.MODULE$.collectionAsScalaIterableConverter(columnNames).asScala().toSeq(),
+          JavaConverters$.MODULE$.collectionAsScalaIterableConverter(columnNames).asScala().toSet(),
+          StructType$.MODULE$.apply(orderedColumnSchemas)
+    )._1;
 
     return sparkSession.createDataFrame(allMetaDataRDD, indexSchema);
   }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
index 63db2f52fc2..f995e484d49 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
@@ -83,7 +83,7 @@ class TestDataSkippingUtils extends HoodieClientTestBase with SparkAdapterSuppor
       )
     )
 
-  val indexSchema: StructType = composeIndexSchema(indexedCols, sourceTableSchema)
+  val (indexSchema: StructType, targetIndexedColumns:  Seq[String]) = composeIndexSchema(indexedCols, indexedCols.toSet, sourceTableSchema)
 
   @ParameterizedTest
   @MethodSource(Array(
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
index 9e674db3541..289d641e2e8 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
@@ -32,7 +32,10 @@ import org.apache.hudi.functional.TestColumnStatsIndex.ColumnStatsTestCase
 import org.apache.hudi.testutils.HoodieClientTestBase
 import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, Expression, GreaterThan, Literal, Or}
 import org.apache.spark.sql.functions.typedLit
+import org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr
 import org.apache.spark.sql.types._
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue}
 import org.junit.jupiter.api._
@@ -244,7 +247,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
       // We have to include "c1", since we sort the expected outputs by this column
       val requestedColumns = Seq("c4", "c1")
 
-      val expectedColStatsSchema = composeIndexSchema(requestedColumns.sorted, sourceTableSchema)
+      val (expectedColStatsSchema, _) = composeIndexSchema(requestedColumns.sorted, targetColumnsToIndex.toSet, sourceTableSchema)
       // Match against expected column stats table
       val expectedColStatsIndexTableDf =
         spark.read
@@ -297,7 +300,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
 
       val requestedColumns = sourceTableSchema.fieldNames
 
-      val expectedColStatsSchema = composeIndexSchema(requestedColumns.sorted, sourceTableSchema)
+      val (expectedColStatsSchema, _) = composeIndexSchema(requestedColumns.sorted, targetColumnsToIndex.toSet, sourceTableSchema)
       val expectedColStatsIndexUpdatedDF =
         spark.read
           .schema(expectedColStatsSchema)
@@ -320,6 +323,103 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testTranslateQueryFiltersIntoColumnStatsIndexFilterExpr(shouldReadInMemory: Boolean): Unit = {
+    val targetColumnsToIndex = Seq("c1", "c2", "c3")
+
+    val metadataOpts = Map(
+      HoodieMetadataConfig.ENABLE.key -> "true",
+      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
+      HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> targetColumnsToIndex.mkString(",")
+    )
+
+    val opts = Map(
+      "hoodie.insert.shuffle.parallelism" -> "4",
+      "hoodie.upsert.shuffle.parallelism" -> "4",
+      HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+      RECORDKEY_FIELD.key -> "c1",
+      PRECOMBINE_FIELD.key -> "c1",
+      HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+    ) ++ metadataOpts
+
+    val sourceJSONTablePath = getClass.getClassLoader.getResource("index/colstats/input-table-json").toString
+
+    // NOTE: Schema here is provided for validation that the input date is in the appropriate format
+    val inputDF = spark.read.schema(sourceTableSchema).json(sourceJSONTablePath)
+
+    inputDF
+      .sort("c1")
+      .repartition(4, new Column("c1"))
+      .write
+      .format("hudi")
+      .options(opts)
+      .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, 10 * 1024)
+      .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+
+    val metadataConfig = HoodieMetadataConfig.newBuilder()
+      .fromProperties(toProperties(metadataOpts))
+      .build()
+
+    ////////////////////////////////////////////////////////////////////////
+    // NOTE: Partial CSI projection
+    //          Projection is requested for set of columns some of which are
+    //          NOT indexed by the CSI
+    ////////////////////////////////////////////////////////////////////////
+
+    // We have to include "c1", since we sort the expected outputs by this column
+    val requestedColumns = Seq("c4", "c1")
+    val columnStatsIndex = new ColumnStatsIndexSupport(spark, sourceTableSchema, metadataConfig, metaClient)
+
+    ////////////////////////////////////////////////////////////////////////
+    // Query filter #1: c1 > 1 and c4 > 'c4 filed value'
+    //                  We should filter only for c1
+    ////////////////////////////////////////////////////////////////////////
+    {
+      val andConditionFilters = Seq(
+        GreaterThan(AttributeReference("c1", IntegerType, nullable = true)(), Literal(1)),
+        GreaterThan(AttributeReference("c4", StringType, nullable = true)(), Literal("c4 filed value"))
+      )
+
+      val expectedAndConditionIndexedFilter = And(
+        GreaterThan(UnresolvedAttribute("c1_maxValue"), Literal(1)),
+        Literal(true)
+      )
+
+      columnStatsIndex.loadTransposed(requestedColumns, shouldReadInMemory) { partialTransposedColStatsDF =>
+        val andConditionActualFilter = andConditionFilters.map(translateIntoColumnStatsIndexFilterExpr(_, partialTransposedColStatsDF.schema))
+          .reduce(And)
+        assertEquals(expectedAndConditionIndexedFilter, andConditionActualFilter)
+      }
+    }
+
+    ////////////////////////////////////////////////////////////////////////
+    // Query filter #2: c1 > 1 or c4 > 'c4 filed value'
+    //                  Since c4 is not indexed, we cannot filter any data
+    ////////////////////////////////////////////////////////////////////////
+    {
+      val orConditionFilters = Seq(
+        Or(GreaterThan(AttributeReference("c1", IntegerType, nullable = true)(), Literal(1)),
+          GreaterThan(AttributeReference("c4", StringType, nullable = true)(), Literal("c4 filed value")))
+      )
+
+      val expectedOrConditionIndexedFilter = Or(
+        GreaterThan(UnresolvedAttribute("c1_maxValue"), Literal(1)),
+        Literal(true)
+      )
+
+      columnStatsIndex.loadTransposed(requestedColumns, shouldReadInMemory) { partialTransposedColStatsDF =>
+        val orConditionActualFilter = orConditionFilters.map(translateIntoColumnStatsIndexFilterExpr(_, partialTransposedColStatsDF.schema))
+          .reduce(And)
+        assertEquals(expectedOrConditionIndexedFilter, orConditionActualFilter)
+      }
+    }
+  }
+
   @Test
   def testParquetMetadataRangeExtraction(): Unit = {
     val df = generateRandomDataFrame(spark)
@@ -415,6 +515,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
           s"sum(1) AS valueCount" +:
             df.columns
               .filter(col => includedCols.contains(col))
+              .filter(col => indexedCols.contains(col))
               .flatMap(col => {
                 val minColName = s"${col}_minValue"
                 val maxColName = s"${col}_maxValue"
@@ -450,7 +551,15 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
 
     val columnStatsIndex = new ColumnStatsIndexSupport(spark, sourceTableSchema, metadataConfig, metaClient)
 
-    val expectedColStatsSchema = composeIndexSchema(sourceTableSchema.fieldNames, sourceTableSchema)
+    val indexedColumns: Set[String] = {
+      val customIndexedColumns = metadataConfig.getColumnsEnabledForColumnStatsIndex
+      if (customIndexedColumns.isEmpty) {
+        sourceTableSchema.fieldNames.toSet
+      } else {
+        customIndexedColumns.asScala.toSet
+      }
+    }
+    val (expectedColStatsSchema, _) = composeIndexSchema(sourceTableSchema.fieldNames, indexedColumns, sourceTableSchema)
     val validationSortColumns = Seq("c1_maxValue", "c1_minValue", "c2_maxValue", "c2_minValue")
 
     columnStatsIndex.loadTransposed(sourceTableSchema.fieldNames, testCase.shouldReadInMemory) { transposedColStatsDF =>