You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/07/27 10:12:53 UTC
[spark] branch branch-3.2 updated: Revert
"[SPARK-36136][SQL][TESTS] Refactor PruneFileSourcePartitionsSuite etc to a
different package"
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new dcd37f9 Revert "[SPARK-36136][SQL][TESTS] Refactor PruneFileSourcePartitionsSuite etc to a different package"
dcd37f9 is described below
commit dcd37f963906fd57a706ea25cb5893be2559d788
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Tue Jul 27 19:11:42 2021 +0900
Revert "[SPARK-36136][SQL][TESTS] Refactor PruneFileSourcePartitionsSuite etc to a different package"
This reverts commit 634f96dde40639df5a2ef246884bedbd48b3dc69.
Closes #33533 from viirya/revert-SPARK-36136.
Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
(cherry picked from commit 22ac98dcbf48575af7912dab2583e38a2a1b751d)
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../PruneFileSourcePartitionsSuite.scala | 61 ++++++++++++----------
.../execution/PruneHiveTablePartitionsSuite.scala | 9 +---
.../hive/execution}/PrunePartitionSuiteBase.scala | 17 +++---
3 files changed, 41 insertions(+), 46 deletions(-)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
similarity index 80%
rename from sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitionsSuite.scala
rename to sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
index 510281a..a669b80 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution.datasources
+package org.apache.spark.sql.hive.execution
import org.scalatest.matchers.should.Matchers._
@@ -24,19 +24,18 @@ import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
-class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase with SharedSparkSession {
+class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase {
override def format: String = "parquet"
@@ -46,27 +45,35 @@ class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase with Shared
test("PruneFileSourcePartitions should not change the output of LogicalRelation") {
withTable("test") {
- spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").saveAsTable("test")
- val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
- val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0)
-
- val dataSchema = StructType(tableMeta.schema.filterNot { f =>
- tableMeta.partitionColumnNames.contains(f.name)
- })
- val relation = HadoopFsRelation(
- location = catalogFileIndex,
- partitionSchema = tableMeta.partitionSchema,
- dataSchema = dataSchema,
- bucketSpec = None,
- fileFormat = new ParquetFileFormat(),
- options = Map.empty)(sparkSession = spark)
-
- val logicalRelation = LogicalRelation(relation, tableMeta)
- val query = Project(Seq(Symbol("id"), Symbol("p")),
- Filter(Symbol("p") === 1, logicalRelation)).analyze
-
- val optimized = Optimize.execute(query)
- assert(optimized.missingInput.isEmpty)
+ withTempDir { dir =>
+ sql(
+ s"""
+ |CREATE EXTERNAL TABLE test(i int)
+ |PARTITIONED BY (p int)
+ |STORED AS parquet
+ |LOCATION '${dir.toURI}'""".stripMargin)
+
+ val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
+ val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0)
+
+ val dataSchema = StructType(tableMeta.schema.filterNot { f =>
+ tableMeta.partitionColumnNames.contains(f.name)
+ })
+ val relation = HadoopFsRelation(
+ location = catalogFileIndex,
+ partitionSchema = tableMeta.partitionSchema,
+ dataSchema = dataSchema,
+ bucketSpec = None,
+ fileFormat = new ParquetFileFormat(),
+ options = Map.empty)(sparkSession = spark)
+
+ val logicalRelation = LogicalRelation(relation, tableMeta)
+ val query = Project(Seq(Symbol("i"), Symbol("p")),
+ Filter(Symbol("p") === 1, logicalRelation)).analyze
+
+ val optimized = Optimize.execute(query)
+ assert(optimized.missingInput.isEmpty)
+ }
}
}
@@ -135,10 +142,6 @@ class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase with Shared
}
}
- protected def collectPartitionFiltersFn(): PartialFunction[SparkPlan, Seq[Expression]] = {
- case scan: FileSourceScanExec => scan.partitionFilters
- }
-
override def getScanExecPartitionSize(plan: SparkPlan): Long = {
plan.collectFirst {
case p: FileSourceScanExec => p.selectedPartitions.length
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala
index df3acab..677b250 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala
@@ -18,16 +18,13 @@
package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
-import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.datasources.PrunePartitionSuiteBase
-import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.LongType
-class PruneHiveTablePartitionsSuite extends PrunePartitionSuiteBase with TestHiveSingleton {
+class PruneHiveTablePartitionsSuite extends PrunePartitionSuiteBase {
override def format(): String = "hive"
@@ -134,10 +131,6 @@ class PruneHiveTablePartitionsSuite extends PrunePartitionSuiteBase with TestHiv
}
}
- protected def collectPartitionFiltersFn(): PartialFunction[SparkPlan, Seq[Expression]] = {
- case scan: HiveTableScanExec => scan.partitionPruningPred
- }
-
override def getScanExecPartitionSize(plan: SparkPlan): Long = {
plan.collectFirst {
case p: HiveTableScanExec => p
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PrunePartitionSuiteBase.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PrunePartitionSuiteBase.scala
similarity index 90%
rename from sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PrunePartitionSuiteBase.scala
rename to sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PrunePartitionSuiteBase.scala
index 9909996..2a690a8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PrunePartitionSuiteBase.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PrunePartitionSuiteBase.scala
@@ -15,15 +15,16 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution.datasources
+package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.StatisticsCollectionTestBase
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BinaryOperator, Expression, IsNotNull, Literal}
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
+import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf.ADAPTIVE_EXECUTION_ENABLED
-abstract class PrunePartitionSuiteBase extends StatisticsCollectionTestBase {
+abstract class PrunePartitionSuiteBase extends StatisticsCollectionTestBase with TestHiveSingleton {
protected def format: String
@@ -94,11 +95,11 @@ abstract class PrunePartitionSuiteBase extends StatisticsCollectionTestBase {
val plan = qe.sparkPlan
assert(getScanExecPartitionSize(plan) == expectedPartitionCount)
- val collectFn: PartialFunction[SparkPlan, Seq[Expression]] = collectPartitionFiltersFn orElse {
+ val pushedDownPartitionFilters = plan.collectFirst {
+ case scan: FileSourceScanExec => scan.partitionFilters
+ case scan: HiveTableScanExec => scan.partitionPruningPred
case BatchScanExec(_, scan: FileScan, _) => scan.partitionFilters
- }
- val pushedDownPartitionFilters = plan.collectFirst(collectFn)
- .map(exps => exps.filterNot(e => e.isInstanceOf[IsNotNull]))
+ }.map(exps => exps.filterNot(e => e.isInstanceOf[IsNotNull]))
val pushedFilters = pushedDownPartitionFilters.map(filters => {
filters.foldLeft("")((currentStr, exp) => {
if (currentStr == "") {
@@ -112,7 +113,5 @@ abstract class PrunePartitionSuiteBase extends StatisticsCollectionTestBase {
assert(pushedFilters == Some(expectedPushedDownFilters))
}
- protected def collectPartitionFiltersFn(): PartialFunction[SparkPlan, Seq[Expression]]
-
protected def getScanExecPartitionSize(plan: SparkPlan): Long
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org