You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by ak...@apache.org on 2023/02/03 05:39:47 UTC
[hudi] branch master updated: [HUDI-5691] Fixing `HoodiePruneFileSourcePartitions` to properly handle non-partitioned tables (#7833)
This is an automated email from the ASF dual-hosted git repository.
akudinkin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9c969380233 [HUDI-5691] Fixing `HoodiePruneFileSourcePartitions` to properly handle non-partitioned tables (#7833)
9c969380233 is described below
commit 9c969380233a27c6729aea75d618891e449e19bf
Author: Alexey Kudinkin <al...@gmail.com>
AuthorDate: Thu Feb 2 21:39:40 2023 -0800
[HUDI-5691] Fixing `HoodiePruneFileSourcePartitions` to properly handle non-partitioned tables (#7833)
This change addresses the issue of `HoodiePruneFileSourcePartition` rule not being applied to non-partitioned table resulting into their corresponding size being incorrectly estimated by Spark
---
.../analysis/HoodiePruneFileSourcePartitions.scala | 2 +-
.../TestHoodiePruneFileSourcePartitions.scala | 40 ++++++++++++++--------
2 files changed, 27 insertions(+), 15 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala
index 3b86777e16e..46cb931a59b 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala
@@ -41,7 +41,7 @@ case class HoodiePruneFileSourcePartitions(spark: SparkSession) extends Rule[Log
override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
case op @ PhysicalOperation(projects, filters, lr @ LogicalRelation(HoodieRelationMatcher(fileIndex), _, _, _))
- if sparkAdapter.isHoodieTable(lr, spark) && fileIndex.partitionSchema.nonEmpty && !fileIndex.hasPredicatesPushedDown =>
+ if sparkAdapter.isHoodieTable(lr, spark) && !fileIndex.hasPredicatesPushedDown =>
val deterministicFilters = filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f))
val normalizedFilters = exprUtils.normalizeExprs(deterministicFilters, lr.output)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala
index 06239697db9..aac2a4027a2 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala
@@ -54,8 +54,11 @@ class TestHoodiePruneFileSourcePartitions extends HoodieClientTestBase with Scal
)
@ParameterizedTest
- @CsvSource(value = Array("cow", "mor"))
- def testPartitionFiltersPushDown(tableType: String): Unit = {
+ @CsvSource(value = Array(
+ "cow,true", "cow,false",
+ "mor,true", "mor,false"
+ ))
+ def testPartitionFiltersPushDown(tableType: String, partitioned: Boolean): Unit = {
spark.sql(
s"""
|CREATE TABLE $tableName (
@@ -65,7 +68,7 @@ class TestHoodiePruneFileSourcePartitions extends HoodieClientTestBase with Scal
| ts long,
| partition string
|) USING hudi
- |PARTITIONED BY (partition)
+ |${if (partitioned) "PARTITIONED BY (partition)" else ""}
|TBLPROPERTIES (
| type = '$tableType',
| primaryKey = 'id',
@@ -103,27 +106,37 @@ class TestHoodiePruneFileSourcePartitions extends HoodieClientTestBase with Scal
// support (for partition-pruning) will only occur during execution phase, while file-listing
// actually happens during analysis stage
case "eager" =>
- assertEquals(1275, f.stats.sizeInBytes.longValue() / 1024)
- assertEquals(1275, lr.stats.sizeInBytes.longValue() / 1024)
+ // NOTE: In case of partitioned table 3 files will be created, while in case of non-partitioned just 1
+ if (partitioned) {
+ assertEquals(1275, f.stats.sizeInBytes.longValue() / 1024)
+ assertEquals(1275, lr.stats.sizeInBytes.longValue() / 1024)
+ } else {
+ // NOTE: We're adding 512 to make sure we always round to the next integer value
+ assertEquals(425, (f.stats.sizeInBytes.longValue() + 512) / 1024)
+ assertEquals(425, (lr.stats.sizeInBytes.longValue() + 512) / 1024)
+ }
// Case #2: Lazy listing (default mode).
// In case of lazy listing mode, Hudi will only list partitions matching partition-predicates that are
// eagerly pushed down (w/ help of [[HoodiePruneFileSourcePartitions]]) avoiding the necessity to
// list the whole table
case "lazy" =>
- assertEquals(425, f.stats.sizeInBytes.longValue() / 1024)
- assertEquals(425, lr.stats.sizeInBytes.longValue() / 1024)
+ // NOTE: We're adding 512 to make sure we always round to the next integer value
+ assertEquals(425, (f.stats.sizeInBytes.longValue() + 512) / 1024)
+ assertEquals(425, (lr.stats.sizeInBytes.longValue() + 512) / 1024)
case _ => throw new UnsupportedOperationException()
}
- val executionPlan = df.queryExecution.executedPlan
- val expectedPhysicalPlanPartitionFiltersClause = tableType match {
- case "cow" => s"PartitionFilters: [isnotnull($attr), ($attr = 2021-01-05)]"
- case "mor" => s"PushedFilters: [IsNotNull(partition), EqualTo(partition,2021-01-05)]"
- }
+ if (partitioned) {
+ val executionPlan = df.queryExecution.executedPlan
+ val expectedPhysicalPlanPartitionFiltersClause = tableType match {
+ case "cow" => s"PartitionFilters: [isnotnull($attr), ($attr = 2021-01-05)]"
+ case "mor" => s"PushedFilters: [IsNotNull(partition), EqualTo(partition,2021-01-05)]"
+ }
- Assertions.assertTrue(executionPlan.toString().contains(expectedPhysicalPlanPartitionFiltersClause))
+ Assertions.assertTrue(executionPlan.toString().contains(expectedPhysicalPlanPartitionFiltersClause))
+ }
case _ =>
val failureHint =
@@ -224,5 +237,4 @@ class TestHoodiePruneFileSourcePartitions extends HoodieClientTestBase with Scal
}
}
-
}