You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/02/04 05:53:06 UTC

[hudi] 01/07: [HUDI-5691] Fixing `HoodiePruneFileSourcePartitions` to properly handle non-partitioned tables (#7833)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 3fc2c8ed97012fa698b302bb036b25ff3259cdad
Author: Alexey Kudinkin <al...@gmail.com>
AuthorDate: Thu Feb 2 21:39:40 2023 -0800

    [HUDI-5691] Fixing `HoodiePruneFileSourcePartitions` to properly handle non-partitioned tables (#7833)
    
    This change addresses the issue of `HoodiePruneFileSourcePartition` rule not being applied to non-partitioned table resulting into their corresponding size being incorrectly estimated by Spark
---
 .../analysis/HoodiePruneFileSourcePartitions.scala |  2 +-
 .../TestHoodiePruneFileSourcePartitions.scala      | 40 ++++++++++++++--------
 2 files changed, 27 insertions(+), 15 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala
index 3b86777e16e..46cb931a59b 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala
@@ -41,7 +41,7 @@ case class HoodiePruneFileSourcePartitions(spark: SparkSession) extends Rule[Log
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
     case op @ PhysicalOperation(projects, filters, lr @ LogicalRelation(HoodieRelationMatcher(fileIndex), _, _, _))
-      if sparkAdapter.isHoodieTable(lr, spark) && fileIndex.partitionSchema.nonEmpty && !fileIndex.hasPredicatesPushedDown =>
+      if sparkAdapter.isHoodieTable(lr, spark) && !fileIndex.hasPredicatesPushedDown =>
 
       val deterministicFilters = filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f))
       val normalizedFilters = exprUtils.normalizeExprs(deterministicFilters, lr.output)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala
index 06239697db9..aac2a4027a2 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala
@@ -54,8 +54,11 @@ class TestHoodiePruneFileSourcePartitions extends HoodieClientTestBase with Scal
     )
 
   @ParameterizedTest
-  @CsvSource(value = Array("cow", "mor"))
-  def testPartitionFiltersPushDown(tableType: String): Unit = {
+  @CsvSource(value = Array(
+    "cow,true", "cow,false",
+    "mor,true", "mor,false"
+  ))
+  def testPartitionFiltersPushDown(tableType: String, partitioned: Boolean): Unit = {
     spark.sql(
       s"""
          |CREATE TABLE $tableName (
@@ -65,7 +68,7 @@ class TestHoodiePruneFileSourcePartitions extends HoodieClientTestBase with Scal
          |  ts long,
          |  partition string
          |) USING hudi
-         |PARTITIONED BY (partition)
+         |${if (partitioned) "PARTITIONED BY (partition)" else ""}
          |TBLPROPERTIES (
          |  type = '$tableType',
          |  primaryKey = 'id',
@@ -103,27 +106,37 @@ class TestHoodiePruneFileSourcePartitions extends HoodieClientTestBase with Scal
             //          support (for partition-pruning) will only occur during execution phase, while file-listing
             //          actually happens during analysis stage
             case "eager" =>
-              assertEquals(1275, f.stats.sizeInBytes.longValue() / 1024)
-              assertEquals(1275, lr.stats.sizeInBytes.longValue() / 1024)
+              // NOTE: In case of partitioned table 3 files will be created, while in case of non-partitioned just 1
+              if (partitioned) {
+                assertEquals(1275, f.stats.sizeInBytes.longValue() / 1024)
+                assertEquals(1275, lr.stats.sizeInBytes.longValue() / 1024)
+              } else {
+                // NOTE: We're adding 512 to make sure we always round to the next integer value
+                assertEquals(425, (f.stats.sizeInBytes.longValue() + 512) / 1024)
+                assertEquals(425, (lr.stats.sizeInBytes.longValue() + 512) / 1024)
+              }
 
             // Case #2: Lazy listing (default mode).
             //          In case of lazy listing mode, Hudi will only list partitions matching partition-predicates that are
             //          eagerly pushed down (w/ help of [[HoodiePruneFileSourcePartitions]]) avoiding the necessity to
             //          list the whole table
             case "lazy" =>
-              assertEquals(425, f.stats.sizeInBytes.longValue() / 1024)
-              assertEquals(425, lr.stats.sizeInBytes.longValue() / 1024)
+              // NOTE: We're adding 512 to make sure we always round to the next integer value
+              assertEquals(425, (f.stats.sizeInBytes.longValue() + 512) / 1024)
+              assertEquals(425, (lr.stats.sizeInBytes.longValue() + 512) / 1024)
 
             case _ => throw new UnsupportedOperationException()
           }
 
-          val executionPlan = df.queryExecution.executedPlan
-          val expectedPhysicalPlanPartitionFiltersClause = tableType match {
-            case "cow" => s"PartitionFilters: [isnotnull($attr), ($attr = 2021-01-05)]"
-            case "mor" => s"PushedFilters: [IsNotNull(partition), EqualTo(partition,2021-01-05)]"
-          }
+          if (partitioned) {
+            val executionPlan = df.queryExecution.executedPlan
+            val expectedPhysicalPlanPartitionFiltersClause = tableType match {
+              case "cow" => s"PartitionFilters: [isnotnull($attr), ($attr = 2021-01-05)]"
+              case "mor" => s"PushedFilters: [IsNotNull(partition), EqualTo(partition,2021-01-05)]"
+            }
 
-          Assertions.assertTrue(executionPlan.toString().contains(expectedPhysicalPlanPartitionFiltersClause))
+            Assertions.assertTrue(executionPlan.toString().contains(expectedPhysicalPlanPartitionFiltersClause))
+          }
 
         case _ =>
           val failureHint =
@@ -224,5 +237,4 @@ class TestHoodiePruneFileSourcePartitions extends HoodieClientTestBase with Scal
     }
   }
 
-
 }