You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by ak...@apache.org on 2023/02/17 17:12:06 UTC
[hudi] branch master updated: [HUDI-5557]Avoid converting columns that are not indexed in CSI (#7672)
This is an automated email from the ASF dual-hosted git repository.
akudinkin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 96032b9b768 [HUDI-5557]Avoid converting columns that are not indexed in CSI (#7672)
96032b9b768 is described below
commit 96032b9b768cc0d4c23eb6a6afd5ce019d56826b
Author: rfyu <39...@users.noreply.github.com>
AuthorDate: Sat Feb 18 01:11:57 2023 +0800
[HUDI-5557]Avoid converting columns that are not indexed in CSI (#7672)
Avoid converting columns that are not indexed in CSI
If we directly set the non-indexed columns statistics to null instead of filtering out these columns,the result of computing pruned list of candidate base-files according to Metadata Table's Column Statistics index may be wrong.
---------
Co-authored-by: yrf余若凡 <rf...@trip.com>
---
.../org/apache/hudi/ColumnStatsIndexSupport.scala | 35 +++----
.../org/apache/hudi/ColumnStatsIndexHelper.java | 7 +-
.../org/apache/hudi/TestDataSkippingUtils.scala | 2 +-
.../hudi/functional/TestColumnStatsIndex.scala | 115 ++++++++++++++++++++-
4 files changed, 131 insertions(+), 28 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index 5cf7a5ec035..68f7a9f16c1 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -209,12 +209,16 @@ class ColumnStatsIndexSupport(spark: SparkSession,
// NOTE: We're sorting the columns to make sure final index schema matches layout
// of the transposed table
val sortedTargetColumnsSet = TreeSet(queryColumns:_*)
- val sortedTargetColumns = sortedTargetColumnsSet.toSeq
// NOTE: This is a trick to avoid pulling all of [[ColumnStatsIndexSupport]] object into the lambdas'
// closures below
val indexedColumns = this.indexedColumns
+ // NOTE: It's crucial to maintain appropriate ordering of the columns
+ // matching table layout: hence, we cherry-pick individual columns
+ // instead of simply filtering in the ones we're interested in the schema
+ val (indexSchema, targetIndexedColumns) = composeIndexSchema(sortedTargetColumnsSet.toSeq, indexedColumns, tableSchema)
+
// Here we perform complex transformation which requires us to modify the layout of the rows
// of the dataset, and therefore we rely on low-level RDD API to avoid incurring encoding/decoding
// penalty of the [[Dataset]], since it's required to adhere to its schema at all times, while
@@ -257,7 +261,7 @@ class ColumnStatsIndexSupport(spark: SparkSession,
// to align existing column-stats for individual file with the list of expected ones for the
// whole transposed projection (a superset of all files)
val columnRecordsMap = columnRecordsSeq.map(r => (r.getColumnName, r)).toMap
- val alignedColStatRecordsSeq = sortedTargetColumns.map(columnRecordsMap.get)
+ val alignedColStatRecordsSeq = targetIndexedColumns.map(columnRecordsMap.get)
val coalescedRowValuesSeq =
alignedColStatRecordsSeq.foldLeft(ListBuffer[Any](fileName, valueCount)) {
@@ -267,31 +271,19 @@ class ColumnStatsIndexSupport(spark: SparkSession,
acc ++= Seq(colStatRecord.getMinValue, colStatRecord.getMaxValue, colStatRecord.getNullCount)
case None =>
// NOTE: This could occur in either of the following cases:
- // 1. Column is not indexed in Column Stats Index: in this case we won't be returning
- // any statistics for such column (ie all stats will be null)
- // 2. Particular file does not have this particular column (which is indexed by Column Stats Index):
+ // 1. Particular file does not have this particular column (which is indexed by Column Stats Index):
// in this case we're assuming missing column to essentially contain exclusively
// null values, we set min/max values as null and null-count to be equal to value-count (this
// behavior is consistent with reading non-existent columns from Parquet)
//
// This is a way to determine current column's index without explicit iteration (we're adding 3 stats / column)
- val idx = acc.length / 3
- val colName = sortedTargetColumns(idx)
- val indexed = indexedColumns.contains(colName)
-
- val nullCount = if (indexed) valueCount else null
-
- acc ++= Seq(null, null, nullCount)
+ acc ++= Seq(null, null, valueCount)
}
}
Row(coalescedRowValuesSeq:_*)
}))
- // NOTE: It's crucial to maintain appropriate ordering of the columns
- // matching table layout: hence, we cherry-pick individual columns
- // instead of simply filtering in the ones we're interested in the schema
- val indexSchema = composeIndexSchema(sortedTargetColumns, tableSchema)
(transposedRows, indexSchema)
}
@@ -382,21 +374,22 @@ object ColumnStatsIndexSupport {
/**
* @VisibleForTesting
*/
- def composeIndexSchema(targetColumnNames: Seq[String], tableSchema: StructType): StructType = {
+ def composeIndexSchema(targetColumnNames: Seq[String], indexedColumns: Set[String], tableSchema: StructType): (StructType, Seq[String]) = {
val fileNameField = StructField(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME, StringType, nullable = true, Metadata.empty)
val valueCountField = StructField(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT, LongType, nullable = true, Metadata.empty)
- val targetFields = targetColumnNames.map(colName => tableSchema.fields.find(f => f.name == colName).get)
+ val targetIndexedColumns = targetColumnNames.filter(indexedColumns.contains(_))
+ val targetIndexedFields = targetIndexedColumns.map(colName => tableSchema.fields.find(f => f.name == colName).get)
- StructType(
- targetFields.foldLeft(Seq(fileNameField, valueCountField)) {
+ (StructType(
+ targetIndexedFields.foldLeft(Seq(fileNameField, valueCountField)) {
case (acc, field) =>
acc ++ Seq(
composeColumnStatStructType(field.name, HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE, field.dataType),
composeColumnStatStructType(field.name, HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE, field.dataType),
composeColumnStatStructType(field.name, HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT, LongType))
}
- )
+ ), targetIndexedColumns)
}
@inline def getMinColumnNameFor(colName: String): String =
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
index 7c9649d4499..7ba82931fb6 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
@@ -237,9 +237,10 @@ public class ColumnStatsIndexHelper {
.filter(Objects::nonNull);
StructType indexSchema = ColumnStatsIndexSupport$.MODULE$.composeIndexSchema(
- JavaConverters$.MODULE$.collectionAsScalaIterableConverter(columnNames).asScala().toSeq(),
- StructType$.MODULE$.apply(orderedColumnSchemas)
- );
+ JavaConverters$.MODULE$.collectionAsScalaIterableConverter(columnNames).asScala().toSeq(),
+ JavaConverters$.MODULE$.collectionAsScalaIterableConverter(columnNames).asScala().toSet(),
+ StructType$.MODULE$.apply(orderedColumnSchemas)
+ )._1;
return sparkSession.createDataFrame(allMetaDataRDD, indexSchema);
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
index 63db2f52fc2..f995e484d49 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
@@ -83,7 +83,7 @@ class TestDataSkippingUtils extends HoodieClientTestBase with SparkAdapterSuppor
)
)
- val indexSchema: StructType = composeIndexSchema(indexedCols, sourceTableSchema)
+ val (indexSchema: StructType, targetIndexedColumns: Seq[String]) = composeIndexSchema(indexedCols, indexedCols.toSet, sourceTableSchema)
@ParameterizedTest
@MethodSource(Array(
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
index 9e674db3541..289d641e2e8 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
@@ -32,7 +32,10 @@ import org.apache.hudi.functional.TestColumnStatsIndex.ColumnStatsTestCase
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, Expression, GreaterThan, Literal, Or}
import org.apache.spark.sql.functions.typedLit
+import org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr
import org.apache.spark.sql.types._
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue}
import org.junit.jupiter.api._
@@ -244,7 +247,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
// We have to include "c1", since we sort the expected outputs by this column
val requestedColumns = Seq("c4", "c1")
- val expectedColStatsSchema = composeIndexSchema(requestedColumns.sorted, sourceTableSchema)
+ val (expectedColStatsSchema, _) = composeIndexSchema(requestedColumns.sorted, targetColumnsToIndex.toSet, sourceTableSchema)
// Match against expected column stats table
val expectedColStatsIndexTableDf =
spark.read
@@ -297,7 +300,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
val requestedColumns = sourceTableSchema.fieldNames
- val expectedColStatsSchema = composeIndexSchema(requestedColumns.sorted, sourceTableSchema)
+ val (expectedColStatsSchema, _) = composeIndexSchema(requestedColumns.sorted, targetColumnsToIndex.toSet, sourceTableSchema)
val expectedColStatsIndexUpdatedDF =
spark.read
.schema(expectedColStatsSchema)
@@ -320,6 +323,103 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testTranslateQueryFiltersIntoColumnStatsIndexFilterExpr(shouldReadInMemory: Boolean): Unit = {
+ val targetColumnsToIndex = Seq("c1", "c2", "c3")
+
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
+ HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> targetColumnsToIndex.mkString(",")
+ )
+
+ val opts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ RECORDKEY_FIELD.key -> "c1",
+ PRECOMBINE_FIELD.key -> "c1",
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+ ) ++ metadataOpts
+
+ val sourceJSONTablePath = getClass.getClassLoader.getResource("index/colstats/input-table-json").toString
+
+ // NOTE: Schema here is provided for validation that the input date is in the appropriate format
+ val inputDF = spark.read.schema(sourceTableSchema).json(sourceJSONTablePath)
+
+ inputDF
+ .sort("c1")
+ .repartition(4, new Column("c1"))
+ .write
+ .format("hudi")
+ .options(opts)
+ .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, 10 * 1024)
+ .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+
+ val metadataConfig = HoodieMetadataConfig.newBuilder()
+ .fromProperties(toProperties(metadataOpts))
+ .build()
+
+ ////////////////////////////////////////////////////////////////////////
+ // NOTE: Partial CSI projection
+ // Projection is requested for set of columns some of which are
+ // NOT indexed by the CSI
+ ////////////////////////////////////////////////////////////////////////
+
+ // We have to include "c1", since we sort the expected outputs by this column
+ val requestedColumns = Seq("c4", "c1")
+ val columnStatsIndex = new ColumnStatsIndexSupport(spark, sourceTableSchema, metadataConfig, metaClient)
+
+ ////////////////////////////////////////////////////////////////////////
+ // Query filter #1: c1 > 1 and c4 > 'c4 filed value'
+ // We should filter only for c1
+ ////////////////////////////////////////////////////////////////////////
+ {
+ val andConditionFilters = Seq(
+ GreaterThan(AttributeReference("c1", IntegerType, nullable = true)(), Literal(1)),
+ GreaterThan(AttributeReference("c4", StringType, nullable = true)(), Literal("c4 filed value"))
+ )
+
+ val expectedAndConditionIndexedFilter = And(
+ GreaterThan(UnresolvedAttribute("c1_maxValue"), Literal(1)),
+ Literal(true)
+ )
+
+ columnStatsIndex.loadTransposed(requestedColumns, shouldReadInMemory) { partialTransposedColStatsDF =>
+ val andConditionActualFilter = andConditionFilters.map(translateIntoColumnStatsIndexFilterExpr(_, partialTransposedColStatsDF.schema))
+ .reduce(And)
+ assertEquals(expectedAndConditionIndexedFilter, andConditionActualFilter)
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////
+ // Query filter #2: c1 > 1 or c4 > 'c4 filed value'
+ // Since c4 is not indexed, we cannot filter any data
+ ////////////////////////////////////////////////////////////////////////
+ {
+ val orConditionFilters = Seq(
+ Or(GreaterThan(AttributeReference("c1", IntegerType, nullable = true)(), Literal(1)),
+ GreaterThan(AttributeReference("c4", StringType, nullable = true)(), Literal("c4 filed value")))
+ )
+
+ val expectedOrConditionIndexedFilter = Or(
+ GreaterThan(UnresolvedAttribute("c1_maxValue"), Literal(1)),
+ Literal(true)
+ )
+
+ columnStatsIndex.loadTransposed(requestedColumns, shouldReadInMemory) { partialTransposedColStatsDF =>
+ val orConditionActualFilter = orConditionFilters.map(translateIntoColumnStatsIndexFilterExpr(_, partialTransposedColStatsDF.schema))
+ .reduce(And)
+ assertEquals(expectedOrConditionIndexedFilter, orConditionActualFilter)
+ }
+ }
+ }
+
@Test
def testParquetMetadataRangeExtraction(): Unit = {
val df = generateRandomDataFrame(spark)
@@ -415,6 +515,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
s"sum(1) AS valueCount" +:
df.columns
.filter(col => includedCols.contains(col))
+ .filter(col => indexedCols.contains(col))
.flatMap(col => {
val minColName = s"${col}_minValue"
val maxColName = s"${col}_maxValue"
@@ -450,7 +551,15 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
val columnStatsIndex = new ColumnStatsIndexSupport(spark, sourceTableSchema, metadataConfig, metaClient)
- val expectedColStatsSchema = composeIndexSchema(sourceTableSchema.fieldNames, sourceTableSchema)
+ val indexedColumns: Set[String] = {
+ val customIndexedColumns = metadataConfig.getColumnsEnabledForColumnStatsIndex
+ if (customIndexedColumns.isEmpty) {
+ sourceTableSchema.fieldNames.toSet
+ } else {
+ customIndexedColumns.asScala.toSet
+ }
+ }
+ val (expectedColStatsSchema, _) = composeIndexSchema(sourceTableSchema.fieldNames, indexedColumns, sourceTableSchema)
val validationSortColumns = Seq("c1_maxValue", "c1_minValue", "c2_maxValue", "c2_minValue")
columnStatsIndex.loadTransposed(sourceTableSchema.fieldNames, testCase.shouldReadInMemory) { transposedColStatsDF =>