You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/07/27 10:12:53 UTC

[spark] branch branch-3.2 updated: Revert "[SPARK-36136][SQL][TESTS] Refactor PruneFileSourcePartitionsSuite etc to a different package"

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new dcd37f9  Revert "[SPARK-36136][SQL][TESTS] Refactor PruneFileSourcePartitionsSuite etc to a different package"
dcd37f9 is described below

commit dcd37f963906fd57a706ea25cb5893be2559d788
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Tue Jul 27 19:11:42 2021 +0900

    Revert "[SPARK-36136][SQL][TESTS] Refactor PruneFileSourcePartitionsSuite etc to a different package"
    
    This reverts commit 634f96dde40639df5a2ef246884bedbd48b3dc69.
    
    Closes #33533 from viirya/revert-SPARK-36136.
    
    Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
    (cherry picked from commit 22ac98dcbf48575af7912dab2583e38a2a1b751d)
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../PruneFileSourcePartitionsSuite.scala           | 61 ++++++++++++----------
 .../execution/PruneHiveTablePartitionsSuite.scala  |  9 +---
 .../hive/execution}/PrunePartitionSuiteBase.scala  | 17 +++---
 3 files changed, 41 insertions(+), 46 deletions(-)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
similarity index 80%
rename from sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitionsSuite.scala
rename to sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
index 510281a..a669b80 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.datasources
+package org.apache.spark.sql.hive.execution
 
 import org.scalatest.matchers.should.Matchers._
 
@@ -24,19 +24,18 @@ import org.apache.spark.sql.{QueryTest, Row}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
 import org.apache.spark.sql.functions.broadcast
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.StructType
 
-class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase with SharedSparkSession {
+class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase {
 
   override def format: String = "parquet"
 
@@ -46,27 +45,35 @@ class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase with Shared
 
   test("PruneFileSourcePartitions should not change the output of LogicalRelation") {
     withTable("test") {
-      spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").saveAsTable("test")
-      val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
-      val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0)
-
-      val dataSchema = StructType(tableMeta.schema.filterNot { f =>
-        tableMeta.partitionColumnNames.contains(f.name)
-      })
-      val relation = HadoopFsRelation(
-        location = catalogFileIndex,
-        partitionSchema = tableMeta.partitionSchema,
-        dataSchema = dataSchema,
-        bucketSpec = None,
-        fileFormat = new ParquetFileFormat(),
-        options = Map.empty)(sparkSession = spark)
-
-      val logicalRelation = LogicalRelation(relation, tableMeta)
-      val query = Project(Seq(Symbol("id"), Symbol("p")),
-        Filter(Symbol("p") === 1, logicalRelation)).analyze
-
-      val optimized = Optimize.execute(query)
-      assert(optimized.missingInput.isEmpty)
+      withTempDir { dir =>
+        sql(
+          s"""
+            |CREATE EXTERNAL TABLE test(i int)
+            |PARTITIONED BY (p int)
+            |STORED AS parquet
+            |LOCATION '${dir.toURI}'""".stripMargin)
+
+        val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
+        val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0)
+
+        val dataSchema = StructType(tableMeta.schema.filterNot { f =>
+          tableMeta.partitionColumnNames.contains(f.name)
+        })
+        val relation = HadoopFsRelation(
+          location = catalogFileIndex,
+          partitionSchema = tableMeta.partitionSchema,
+          dataSchema = dataSchema,
+          bucketSpec = None,
+          fileFormat = new ParquetFileFormat(),
+          options = Map.empty)(sparkSession = spark)
+
+        val logicalRelation = LogicalRelation(relation, tableMeta)
+        val query = Project(Seq(Symbol("i"), Symbol("p")),
+          Filter(Symbol("p") === 1, logicalRelation)).analyze
+
+        val optimized = Optimize.execute(query)
+        assert(optimized.missingInput.isEmpty)
+      }
     }
   }
 
@@ -135,10 +142,6 @@ class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase with Shared
     }
   }
 
-  protected def collectPartitionFiltersFn(): PartialFunction[SparkPlan, Seq[Expression]] = {
-    case scan: FileSourceScanExec => scan.partitionFilters
-  }
-
   override def getScanExecPartitionSize(plan: SparkPlan): Long = {
     plan.collectFirst {
       case p: FileSourceScanExec => p.selectedPartitions.length
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala
index df3acab..677b250 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala
@@ -18,16 +18,13 @@
 package org.apache.spark.sql.hive.execution
 
 import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
-import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.datasources.PrunePartitionSuiteBase
-import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.LongType
 
-class PruneHiveTablePartitionsSuite extends PrunePartitionSuiteBase with TestHiveSingleton {
+class PruneHiveTablePartitionsSuite extends PrunePartitionSuiteBase {
 
   override def format(): String = "hive"
 
@@ -134,10 +131,6 @@ class PruneHiveTablePartitionsSuite extends PrunePartitionSuiteBase with TestHiv
     }
   }
 
-  protected def collectPartitionFiltersFn(): PartialFunction[SparkPlan, Seq[Expression]] = {
-    case scan: HiveTableScanExec => scan.partitionPruningPred
-  }
-
   override def getScanExecPartitionSize(plan: SparkPlan): Long = {
     plan.collectFirst {
       case p: HiveTableScanExec => p
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PrunePartitionSuiteBase.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PrunePartitionSuiteBase.scala
similarity index 90%
rename from sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PrunePartitionSuiteBase.scala
rename to sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PrunePartitionSuiteBase.scala
index 9909996..2a690a8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PrunePartitionSuiteBase.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PrunePartitionSuiteBase.scala
@@ -15,15 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.datasources
+package org.apache.spark.sql.hive.execution
 
 import org.apache.spark.sql.StatisticsCollectionTestBase
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BinaryOperator, Expression, IsNotNull, Literal}
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
 import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
+import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf.ADAPTIVE_EXECUTION_ENABLED
 
-abstract class PrunePartitionSuiteBase extends StatisticsCollectionTestBase {
+abstract class PrunePartitionSuiteBase extends StatisticsCollectionTestBase with TestHiveSingleton {
 
   protected def format: String
 
@@ -94,11 +95,11 @@ abstract class PrunePartitionSuiteBase extends StatisticsCollectionTestBase {
     val plan = qe.sparkPlan
     assert(getScanExecPartitionSize(plan) == expectedPartitionCount)
 
-    val collectFn: PartialFunction[SparkPlan, Seq[Expression]] = collectPartitionFiltersFn orElse {
+    val pushedDownPartitionFilters = plan.collectFirst {
+      case scan: FileSourceScanExec => scan.partitionFilters
+      case scan: HiveTableScanExec => scan.partitionPruningPred
       case BatchScanExec(_, scan: FileScan, _) => scan.partitionFilters
-    }
-    val pushedDownPartitionFilters = plan.collectFirst(collectFn)
-      .map(exps => exps.filterNot(e => e.isInstanceOf[IsNotNull]))
+    }.map(exps => exps.filterNot(e => e.isInstanceOf[IsNotNull]))
     val pushedFilters = pushedDownPartitionFilters.map(filters => {
       filters.foldLeft("")((currentStr, exp) => {
         if (currentStr == "") {
@@ -112,7 +113,5 @@ abstract class PrunePartitionSuiteBase extends StatisticsCollectionTestBase {
     assert(pushedFilters == Some(expectedPushedDownFilters))
   }
 
-  protected def collectPartitionFiltersFn(): PartialFunction[SparkPlan, Seq[Expression]]
-
   protected def getScanExecPartitionSize(plan: SparkPlan): Long
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org