You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by su...@apache.org on 2022/06/21 17:40:24 UTC
[spark] branch master updated: [SPARK-38647][SQL] Add SupportsReportOrdering mix in interface for Scan (DataSourceV2)
This is an automated email from the ASF dual-hosted git repository.
sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b588d070ebc [SPARK-38647][SQL] Add SupportsReportOrdering mix in interface for Scan (DataSourceV2)
b588d070ebc is described below
commit b588d070ebc234280af730c8f9915e8859b1886e
Author: Enrico Minack <gi...@enrico.minack.dev>
AuthorDate: Tue Jun 21 10:40:06 2022 -0700
[SPARK-38647][SQL] Add SupportsReportOrdering mix in interface for Scan (DataSourceV2)
### What changes were proposed in this pull request?
As `SupportsReportPartitioning` allows implementations of `Scan` provide Spark with information about the exiting partitioning of data read by a `DataSourceV2`, a similar mix in interface `SupportsReportOrdering` should provide order information.
### Why are the changes needed?
This prevents Spark from sorting data if they already exhibit a certain order provided by the source.
### Does this PR introduce _any_ user-facing change?
It adds `SupportsReportOrdering` mix in interface.
### How was this patch tested?
This adds tests to `DataSourceV2Suite`, similar to the test for `SupportsReportPartitioning`.
Closes #35965 from EnricoMi/branch-datasourcev2-output-ordering.
Authored-by: Enrico Minack <gi...@enrico.minack.dev>
Signed-off-by: Chao Sun <su...@apple.com>
---
.../apache/spark/sql/avro/AvroRowReaderSuite.scala | 2 +-
.../org/apache/spark/sql/avro/AvroSuite.scala | 6 +-
.../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 2 +-
.../sql/connector/read/SupportsReportOrdering.java | 40 +++++
.../datasources/v2/DataSourceV2Relation.scala | 6 +-
.../main/scala/org/apache/spark/sql/Dataset.scala | 2 +-
.../spark/sql/execution/SparkOptimizer.scala | 6 +-
.../execution/datasources/v2/BatchScanExec.scala | 3 +-
.../datasources/v2/ContinuousScanExec.scala | 3 +-
.../datasources/v2/DataSourceV2ScanExecBase.scala | 12 +-
.../datasources/v2/DataSourceV2Strategy.scala | 8 +-
.../datasources/v2/MicroBatchScanExec.scala | 5 +-
...g.scala => V2ScanPartitioningAndOrdering.scala} | 26 ++-
.../dynamicpruning/PartitionPruning.scala | 2 +-
.../JavaOrderAndPartitionAwareDataSource.java | 160 ++++++++++++++++++
.../spark/sql/FileBasedDataSourceSuite.scala | 4 +-
.../spark/sql/connector/DataSourceV2Suite.scala | 180 +++++++++++++++++++--
.../datasources/PrunePartitionSuiteBase.scala | 2 +-
.../execution/datasources/orc/OrcFilterSuite.scala | 2 +-
.../sql/execution/datasources/orc/OrcTest.scala | 2 +-
.../datasources/orc/OrcV2SchemaPruningSuite.scala | 2 +-
.../datasources/parquet/ParquetFilterSuite.scala | 2 +-
22 files changed, 427 insertions(+), 50 deletions(-)
diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroRowReaderSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroRowReaderSuite.scala
index 08c61381c57..2f4248e88d5 100644
--- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroRowReaderSuite.scala
+++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroRowReaderSuite.scala
@@ -59,7 +59,7 @@ class AvroRowReaderSuite
val df = spark.read.format("avro").load(dir.getCanonicalPath)
val fileScan = df.queryExecution.executedPlan collectFirst {
- case BatchScanExec(_, f: AvroScan, _, _) => f
+ case BatchScanExec(_, f: AvroScan, _, _, _) => f
}
val filePath = fileScan.get.fileIndex.inputFiles(0)
val fileSize = new File(new URI(filePath)).length
diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index e9dd1a1bf50..d75e6906719 100644
--- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -2335,7 +2335,7 @@ class AvroV2Suite extends AvroSuite with ExplainSuiteHelper {
})
val fileScan = df.queryExecution.executedPlan collectFirst {
- case BatchScanExec(_, f: AvroScan, _, _) => f
+ case BatchScanExec(_, f: AvroScan, _, _, _) => f
}
assert(fileScan.nonEmpty)
assert(fileScan.get.partitionFilters.nonEmpty)
@@ -2368,7 +2368,7 @@ class AvroV2Suite extends AvroSuite with ExplainSuiteHelper {
assert(filterCondition.isDefined)
val fileScan = df.queryExecution.executedPlan collectFirst {
- case BatchScanExec(_, f: AvroScan, _, _) => f
+ case BatchScanExec(_, f: AvroScan, _, _, _) => f
}
assert(fileScan.nonEmpty)
assert(fileScan.get.partitionFilters.isEmpty)
@@ -2449,7 +2449,7 @@ class AvroV2Suite extends AvroSuite with ExplainSuiteHelper {
.where("value = 'a'")
val fileScan = df.queryExecution.executedPlan collectFirst {
- case BatchScanExec(_, f: AvroScan, _, _) => f
+ case BatchScanExec(_, f: AvroScan, _, _, _) => f
}
assert(fileScan.nonEmpty)
if (filtersPushdown) {
diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
index 9f93fbf96d2..5f0033490d5 100644
--- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
+++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
@@ -372,7 +372,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
private def checkAggregatePushed(df: DataFrame, funcName: String): Unit = {
df.queryExecution.optimizedPlan.collect {
- case DataSourceV2ScanRelation(_, scan, _, _) =>
+ case DataSourceV2ScanRelation(_, scan, _, _, _) =>
assert(scan.isInstanceOf[V1ScanWrapper])
val wrapper = scan.asInstanceOf[V1ScanWrapper]
assert(wrapper.pushedDownOperators.aggregation.isDefined)
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsReportOrdering.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsReportOrdering.java
new file mode 100644
index 00000000000..0dc102d11c2
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsReportOrdering.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.connector.expressions.SortOrder;
+
+/**
+ * A mix in interface for {@link Scan}. Data sources can implement this interface to
+ * report the order of data in each partition to Spark.
+ * Global order is part of the partitioning, see {@link SupportsReportPartitioning}.
+ * <p>
+ * Spark uses ordering information to exploit existing order to avoid sorting required by
+ * subsequent operations.
+ *
+ * @since 3.4.0
+ */
+@Evolving
+public interface SupportsReportOrdering extends Scan {
+
+ /**
+ * Returns the order in each partition of this data source scan.
+ */
+ SortOrder[] outputOrdering();
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 61fe3602bb6..178a6b02875 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.datasources.v2
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{ExposesMetadataColumns, LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.util.{truncatedString, CharVarcharUtils}
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, MetadataColumn, SupportsMetadataColumns, Table, TableCapability}
@@ -115,12 +115,14 @@ case class DataSourceV2Relation(
* @param output the output attributes of this relation
* @param keyGroupedPartitioning if set, the partitioning expressions that are used to split the
* rows in the scan across different partitions
+ * @param ordering if set, the ordering provided by the scan
*/
case class DataSourceV2ScanRelation(
relation: DataSourceV2Relation,
scan: Scan,
output: Seq[AttributeReference],
- keyGroupedPartitioning: Option[Seq[Expression]] = None) extends LeafNode with NamedRelation {
+ keyGroupedPartitioning: Option[Seq[Expression]] = None,
+ ordering: Option[Seq[SortOrder]] = None) extends LeafNode with NamedRelation {
override def name: String = relation.table.name()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 97a5318b3ed..d79d50ced2b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -3689,7 +3689,7 @@ class Dataset[T] private[sql](
case r: HiveTableRelation =>
r.tableMeta.storage.locationUri.map(_.toString).toArray
case DataSourceV2ScanRelation(DataSourceV2Relation(table: FileTable, _, _, _, _),
- _, _, _) =>
+ _, _, _, _) =>
table.fileIndex.inputFiles
}.flatten
files.toSet.toArray
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index 056c16affc2..bffa1d1dae7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions
import org.apache.spark.sql.execution.datasources.SchemaPruning
-import org.apache.spark.sql.execution.datasources.v2.{GroupBasedRowLevelOperationScanPlanning, OptimizeMetadataOnlyDeleteFromTable, V2ScanPartitioning, V2ScanRelationPushDown, V2Writes}
+import org.apache.spark.sql.execution.datasources.v2.{GroupBasedRowLevelOperationScanPlanning, OptimizeMetadataOnlyDeleteFromTable, V2ScanPartitioningAndOrdering, V2ScanRelationPushDown, V2Writes}
import org.apache.spark.sql.execution.dynamicpruning.{CleanupDynamicPruningFilters, PartitionPruning}
import org.apache.spark.sql.execution.python.{ExtractGroupingPythonUDFFromAggregate, ExtractPythonUDFFromAggregate, ExtractPythonUDFs}
@@ -40,7 +40,7 @@ class SparkOptimizer(
Seq(SchemaPruning) :+
GroupBasedRowLevelOperationScanPlanning :+
V2ScanRelationPushDown :+
- V2ScanPartitioning :+
+ V2ScanPartitioningAndOrdering :+
V2Writes :+
PruneFileSourcePartitions
@@ -86,7 +86,7 @@ class SparkOptimizer(
ExtractPythonUDFs.ruleName :+
GroupBasedRowLevelOperationScanPlanning.ruleName :+
V2ScanRelationPushDown.ruleName :+
- V2ScanPartitioning.ruleName :+
+ V2ScanPartitioningAndOrdering.ruleName :+
V2Writes.ruleName :+
ReplaceCTERefWithRepartition.ruleName
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
index 0b813d52cee..ba969eb6ff1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
@@ -37,7 +37,8 @@ case class BatchScanExec(
output: Seq[AttributeReference],
@transient scan: Scan,
runtimeFilters: Seq[Expression],
- keyGroupedPartitioning: Option[Seq[Expression]] = None) extends DataSourceV2ScanExecBase {
+ keyGroupedPartitioning: Option[Seq[Expression]] = None,
+ ordering: Option[Seq[SortOrder]] = None) extends DataSourceV2ScanExecBase {
@transient lazy val batch = scan.toBatch
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala
index 5f973e10b80..cf16a81eaf9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala
@@ -32,7 +32,8 @@ case class ContinuousScanExec(
@transient scan: Scan,
@transient stream: ContinuousStream,
@transient start: Offset,
- keyGroupedPartitioning: Option[Seq[Expression]] = None) extends DataSourceV2ScanExecBase {
+ keyGroupedPartitioning: Option[Seq[Expression]] = None,
+ ordering: Option[Seq[SortOrder]] = None) extends DataSourceV2ScanExecBase {
// TODO: unify the equal/hashCode implementation for all data source v2 query plans.
override def equals(other: Any): Boolean = other match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
index 42909986fce..c5926edb6c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Expression, RowOrdering}
+import org.apache.spark.sql.catalyst.expressions.{Expression, RowOrdering, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, SinglePartition}
import org.apache.spark.sql.catalyst.util.truncatedString
@@ -50,6 +50,10 @@ trait DataSourceV2ScanExecBase extends LeafExecNode {
* `SupportsReportPartitioning` */
def keyGroupedPartitioning: Option[Seq[Expression]]
+ /** Optional ordering expressions provided by the V2 data sources, through
+ * `SupportsReportOrdering` */
+ def ordering: Option[Seq[SortOrder]]
+
protected def inputPartitions: Seq[InputPartition]
override def simpleString(maxFields: Int): String = {
@@ -138,6 +142,12 @@ trait DataSourceV2ScanExecBase extends LeafExecNode {
}
}
+ override def outputOrdering: Seq[SortOrder] = {
+ // when multiple partitions are grouped together, ordering inside partitions is not preserved
+ val partitioningPreservesOrdering = groupedPartitions.forall(_.forall(_._2.length <= 1))
+ ordering.filter(_ => partitioningPreservesOrdering).getOrElse(super.outputOrdering)
+ }
+
override def supportsColumnar: Boolean = {
require(inputPartitions.forall(readerFactory.supportColumnarReads) ||
!inputPartitions.exists(readerFactory.supportColumnarReads),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 2add527a359..28dbf8b13f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -106,7 +106,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(project, filters, DataSourceV2ScanRelation(
- _, V1ScanWrapper(scan, pushed, pushedDownOperators), output, _)) =>
+ _, V1ScanWrapper(scan, pushed, pushedDownOperators), output, _, _)) =>
val v1Relation = scan.toV1TableScan[BaseRelation with TableScan](session.sqlContext)
if (v1Relation.schema != scan.readSchema()) {
throw QueryExecutionErrors.fallbackV1RelationReportsInconsistentSchemaError(
@@ -127,7 +127,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
withProjectAndFilter(project, filters, dsScan, needsUnsafeConversion = false) :: Nil
case PhysicalOperation(project, filters,
- DataSourceV2ScanRelation(_, scan: LocalScan, output, _)) =>
+ DataSourceV2ScanRelation(_, scan: LocalScan, output, _, _)) =>
val localScanExec = LocalTableScanExec(output, scan.rows().toSeq)
withProjectAndFilter(project, filters, localScanExec, needsUnsafeConversion = false) :: Nil
@@ -140,7 +140,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case _ => false
}
val batchExec = BatchScanExec(relation.output, relation.scan, runtimeFilters,
- relation.keyGroupedPartitioning)
+ relation.keyGroupedPartitioning, relation.ordering)
withProjectAndFilter(project, postScanFilters, batchExec, !batchExec.supportsColumnar) :: Nil
case PhysicalOperation(p, f, r: StreamingDataSourceV2Relation)
@@ -267,7 +267,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case DeleteFromTable(relation, condition) =>
relation match {
- case DataSourceV2ScanRelation(r, _, output, _) =>
+ case DataSourceV2ScanRelation(r, _, output, _, _) =>
val table = r.table
if (SubqueryExpression.hasSubquery(condition)) {
throw QueryCompilationErrors.unsupportedDeleteByConditionWithSubqueryError(condition)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala
index 3db7fb78512..cb99cd73dad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, SortOrder}
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory, Scan}
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset}
@@ -32,7 +32,8 @@ case class MicroBatchScanExec(
@transient stream: MicroBatchStream,
@transient start: Offset,
@transient end: Offset,
- keyGroupedPartitioning: Option[Seq[Expression]] = None) extends DataSourceV2ScanExecBase {
+ keyGroupedPartitioning: Option[Seq[Expression]] = None,
+ ordering: Option[Seq[SortOrder]] = None) extends DataSourceV2ScanExecBase {
// TODO: unify the equal/hashCode implementation for all data source v2 query plans.
override def equals(other: Any): Boolean = other match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioningAndOrdering.scala
similarity index 68%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioning.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioningAndOrdering.scala
index 9a5a7e6aab6..7ea1ca8c244 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioning.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioningAndOrdering.scala
@@ -21,18 +21,26 @@ import org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.FunctionCatalog
-import org.apache.spark.sql.connector.read.SupportsReportPartitioning
+import org.apache.spark.sql.connector.read.{SupportsReportOrdering, SupportsReportPartitioning}
import org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning, UnknownPartitioning}
import org.apache.spark.util.collection.Utils.sequenceToOption
/**
* Extracts [[DataSourceV2ScanRelation]] from the input logical plan, converts any V2 partitioning
- * reported by data sources to their catalyst counterparts. Then, annotates the plan with the
- * result.
+ * and ordering reported by data sources to their catalyst counterparts. Then, annotates the plan
+ * with the partitioning and ordering result.
*/
-object V2ScanPartitioning extends Rule[LogicalPlan] with SQLConfHelper {
- override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
- case d @ DataSourceV2ScanRelation(relation, scan: SupportsReportPartitioning, _, None) =>
+object V2ScanPartitioningAndOrdering extends Rule[LogicalPlan] with SQLConfHelper {
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ val scanRules = Seq[LogicalPlan => LogicalPlan] (partitioning, ordering)
+
+ scanRules.foldLeft(plan) { (newPlan, scanRule) =>
+ scanRule(newPlan)
+ }
+ }
+
+ private def partitioning(plan: LogicalPlan) = plan.transformDown {
+ case d @ DataSourceV2ScanRelation(relation, scan: SupportsReportPartitioning, _, None, _) =>
val funCatalogOpt = relation.catalog.flatMap {
case c: FunctionCatalog => Some(c)
case _ => None
@@ -48,4 +56,10 @@ object V2ScanPartitioning extends Rule[LogicalPlan] with SQLConfHelper {
d.copy(keyGroupedPartitioning = catalystPartitioning)
}
+
+ private def ordering(plan: LogicalPlan) = plan.transformDown {
+ case d @ DataSourceV2ScanRelation(relation, scan: SupportsReportOrdering, _, _, _) =>
+ val ordering = V2ExpressionUtils.toCatalystOrdering(scan.outputOrdering(), relation)
+ d.copy(ordering = Some(ordering))
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
index 402c59bc3de..61a243ddb33 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
@@ -78,7 +78,7 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join
} else {
None
}
- case (resExp, r @ DataSourceV2ScanRelation(_, scan: SupportsRuntimeFiltering, _, _)) =>
+ case (resExp, r @ DataSourceV2ScanRelation(_, scan: SupportsRuntimeFiltering, _, _, _)) =>
val filterAttrs = V2ExpressionUtils.resolveRefs[Attribute](scan.filterAttributes, r)
if (resExp.references.subsetOf(AttributeSet(filterAttrs))) {
Some(r)
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaOrderAndPartitionAwareDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaOrderAndPartitionAwareDataSource.java
new file mode 100644
index 00000000000..406827038d4
--- /dev/null
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaOrderAndPartitionAwareDataSource.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql.connector;
+
+import java.util.Arrays;
+
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.expressions.*;
+import org.apache.spark.sql.connector.read.*;
+import org.apache.spark.sql.connector.read.partitioning.KeyGroupedPartitioning;
+import org.apache.spark.sql.connector.read.partitioning.Partitioning;
+import org.apache.spark.sql.connector.read.partitioning.UnknownPartitioning;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class JavaOrderAndPartitionAwareDataSource extends JavaPartitionAwareDataSource {
+
+ static class MyScanBuilder extends JavaPartitionAwareDataSource.MyScanBuilder
+ implements SupportsReportOrdering {
+
+ private final Partitioning partitioning;
+ private final SortOrder[] ordering;
+
+ MyScanBuilder(String partitionKeys, String orderKeys) {
+ if (partitionKeys != null) {
+ String[] keys = partitionKeys.split(",");
+ Expression[] clustering = new Transform[keys.length];
+ for (int i = 0; i < keys.length; i++) {
+ clustering[i] = Expressions.identity(keys[i]);
+ }
+ this.partitioning = new KeyGroupedPartitioning(clustering, 2);
+ } else {
+ this.partitioning = new UnknownPartitioning(2);
+ }
+
+ if (orderKeys != null) {
+ String[] keys = orderKeys.split(",");
+ this.ordering = new SortOrder[keys.length];
+ for (int i = 0; i < keys.length; i++) {
+ this.ordering[i] = new MySortOrder(keys[i]);
+ }
+ } else {
+ this.ordering = new SortOrder[0];
+ }
+ }
+
+ @Override
+ public InputPartition[] planInputPartitions() {
+ InputPartition[] partitions = new InputPartition[2];
+ partitions[0] = new SpecificInputPartition(new int[]{1, 1, 3}, new int[]{4, 5, 5});
+ partitions[1] = new SpecificInputPartition(new int[]{2, 4, 4}, new int[]{6, 1, 2});
+ return partitions;
+ }
+
+ @Override
+ public Partitioning outputPartitioning() {
+ return this.partitioning;
+ }
+
+ @Override
+ public SortOrder[] outputOrdering() {
+ return this.ordering;
+ }
+ }
+
+ @Override
+ public Table getTable(CaseInsensitiveStringMap options) {
+ return new JavaSimpleBatchTable() {
+ @Override
+ public Transform[] partitioning() {
+ String partitionKeys = options.get("partitionKeys");
+ if (partitionKeys == null) {
+ return new Transform[0];
+ } else {
+ return (Transform[]) Arrays.stream(partitionKeys.split(","))
+ .map(Expressions::identity).toArray();
+ }
+ }
+
+ @Override
+ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
+ return new MyScanBuilder(options.get("partitionKeys"), options.get("orderKeys"));
+ }
+ };
+ }
+
+ static class MySortOrder implements SortOrder {
+ private final Expression expression;
+
+ MySortOrder(String columnName) {
+ this.expression = new MyIdentityTransform(new MyNamedReference(columnName));
+ }
+
+ @Override
+ public Expression expression() {
+ return expression;
+ }
+
+ @Override
+ public SortDirection direction() {
+ return SortDirection.ASCENDING;
+ }
+
+ @Override
+ public NullOrdering nullOrdering() {
+ return NullOrdering.NULLS_FIRST;
+ }
+ }
+
+ static class MyNamedReference implements NamedReference {
+ private final String[] parts;
+
+ MyNamedReference(String part) {
+ this.parts = new String[] { part };
+ }
+
+ @Override
+ public String[] fieldNames() {
+ return this.parts;
+ }
+ }
+
+ static class MyIdentityTransform implements Transform {
+ private final Expression[] args;
+
+ MyIdentityTransform(NamedReference namedReference) {
+ this.args = new Expression[] { namedReference };
+ }
+
+ @Override
+ public String name() {
+ return "identity";
+ }
+
+ @Override
+ public NamedReference[] references() {
+ return new NamedReference[0];
+ }
+
+ @Override
+ public Expression[] arguments() {
+ return this.args;
+ }
+ }
+
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 5011a7713a5..d024cb2a6ab 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -849,7 +849,7 @@ class FileBasedDataSourceSuite extends QueryTest
})
val fileScan = df.queryExecution.executedPlan collectFirst {
- case BatchScanExec(_, f: FileScan, _, _) => f
+ case BatchScanExec(_, f: FileScan, _, _, _) => f
}
assert(fileScan.nonEmpty)
assert(fileScan.get.partitionFilters.nonEmpty)
@@ -889,7 +889,7 @@ class FileBasedDataSourceSuite extends QueryTest
assert(filterCondition.isDefined)
val fileScan = df.queryExecution.executedPlan collectFirst {
- case BatchScanExec(_, f: FileScan, _, _) => f
+ case BatchScanExec(_, f: FileScan, _, _, _) => f
}
assert(fileScan.nonEmpty)
assert(fileScan.get.partitionFilters.isEmpty)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
index 3fefaf72df4..5c4be75e02c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
@@ -26,14 +26,16 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability, TableProvider}
import org.apache.spark.sql.connector.catalog.TableCapability._
-import org.apache.spark.sql.connector.expressions.{FieldReference, Literal, Transform}
+import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, Literal, NamedReference, NullOrdering, SortDirection, SortOrder, Transform}
import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.connector.read._
-import org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning, Partitioning}
+import org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning, Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.execution.SortExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation, DataSourceV2ScanRelation}
import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
+import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{Filter, GreaterThan}
@@ -251,27 +253,27 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS
val df = spark.read.format(cls.getName).load()
checkAnswer(df, Seq(Row(1, 4), Row(1, 4), Row(3, 6), Row(2, 6), Row(4, 2), Row(4, 2)))
- val groupByColA = df.groupBy($"i").agg(sum($"j"))
- checkAnswer(groupByColA, Seq(Row(1, 8), Row(2, 6), Row(3, 6), Row(4, 4)))
- assert(collectFirst(groupByColA.queryExecution.executedPlan) {
+ val groupByColI = df.groupBy($"i").agg(sum($"j"))
+ checkAnswer(groupByColI, Seq(Row(1, 8), Row(2, 6), Row(3, 6), Row(4, 4)))
+ assert(collectFirst(groupByColI.queryExecution.executedPlan) {
case e: ShuffleExchangeExec => e
}.isEmpty)
- val groupByColAB = df.groupBy($"i", $"j").agg(count("*"))
- checkAnswer(groupByColAB, Seq(Row(1, 4, 2), Row(2, 6, 1), Row(3, 6, 1), Row(4, 2, 2)))
- assert(collectFirst(groupByColAB.queryExecution.executedPlan) {
+ val groupByColIJ = df.groupBy($"i", $"j").agg(count("*"))
+ checkAnswer(groupByColIJ, Seq(Row(1, 4, 2), Row(2, 6, 1), Row(3, 6, 1), Row(4, 2, 2)))
+ assert(collectFirst(groupByColIJ.queryExecution.executedPlan) {
case e: ShuffleExchangeExec => e
}.isEmpty)
- val groupByColB = df.groupBy($"j").agg(sum($"i"))
- checkAnswer(groupByColB, Seq(Row(2, 8), Row(4, 2), Row(6, 5)))
- assert(collectFirst(groupByColB.queryExecution.executedPlan) {
+ val groupByColJ = df.groupBy($"j").agg(sum($"i"))
+ checkAnswer(groupByColJ, Seq(Row(2, 8), Row(4, 2), Row(6, 5)))
+ assert(collectFirst(groupByColJ.queryExecution.executedPlan) {
case e: ShuffleExchangeExec => e
}.isDefined)
- val groupByAPlusB = df.groupBy($"i" + $"j").agg(count("*"))
- checkAnswer(groupByAPlusB, Seq(Row(5, 2), Row(6, 2), Row(8, 1), Row(9, 1)))
- assert(collectFirst(groupByAPlusB.queryExecution.executedPlan) {
+ val groupByIPlusJ = df.groupBy($"i" + $"j").agg(count("*"))
+ checkAnswer(groupByIPlusJ, Seq(Row(5, 2), Row(6, 2), Row(8, 1), Row(9, 1)))
+ assert(collectFirst(groupByIPlusJ.queryExecution.executedPlan) {
case e: ShuffleExchangeExec => e
}.isDefined)
}
@@ -279,6 +281,90 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS
}
}
+ test("ordering and partitioning reporting") {
+ withSQLConf(SQLConf.V2_BUCKETING_ENABLED.key -> "true") {
+ Seq(
+ classOf[OrderAndPartitionAwareDataSource],
+ classOf[JavaOrderAndPartitionAwareDataSource]
+ ).foreach { cls =>
+ withClue(cls.getName) {
+ // we test report ordering (together with report partitioning) with these transformations:
+ // - groupBy("i").flatMapGroups:
+ // hash-partitions by "i" and sorts each partition by "i"
+ // requires partitioning and sort by "i"
+ // - aggregation function over window partitioned by "i" and ordered by "j":
+ // hash-partitions by "i" and sorts each partition by "j"
+ // requires partitioning by "i" and sort by "i" and "j"
+ Seq(
+ // with no partitioning and no order, we expect shuffling AND sorting
+ (None, None, (true, true), (true, true)),
+ // partitioned by i and no order, we expect NO shuffling BUT sorting
+ (Some("i"), None, (false, true), (false, true)),
+ // partitioned by i and in-partition sorted by i,
+ // we expect NO shuffling AND sorting for groupBy but sorting for window function
+ (Some("i"), Some("i"), (false, false), (false, true)),
+ // partitioned by i and in-partition sorted by j, we expect NO shuffling BUT sorting
+ (Some("i"), Some("j"), (false, true), (false, true)),
+ // partitioned by i and in-partition sorted by i,j, we expect NO shuffling NOR sorting
+ (Some("i"), Some("i,j"), (false, false), (false, false)),
+ // partitioned by j and in-partition sorted by i, we expect shuffling AND sorting
+ (Some("j"), Some("i"), (true, true), (true, true)),
+ // partitioned by j and in-partition sorted by i,j, we expect shuffling and sorting
+ (Some("j"), Some("i,j"), (true, true), (true, true))
+ ).foreach { testParams =>
+ val (partitionKeys, orderKeys, groupByExpects, windowFuncExpects) = testParams
+
+ withClue(f"${partitionKeys.orNull} ${orderKeys.orNull}") {
+ val df = spark.read
+ .option("partitionKeys", partitionKeys.orNull)
+ .option("orderKeys", orderKeys.orNull)
+ .format(cls.getName)
+ .load()
+ checkAnswer(df, Seq(Row(1, 4), Row(1, 5), Row(3, 5), Row(2, 6), Row(4, 1), Row(4, 2)))
+
+ // groupBy(i).flatMapGroups
+ {
+ val groupBy = df.groupBy($"i").as[Int, (Int, Int)]
+ .flatMapGroups { (i: Int, it: Iterator[(Int, Int)]) =>
+ Iterator.single((i, it.length)) }
+ checkAnswer(
+ groupBy.toDF(),
+ Seq(Row(1, 2), Row(2, 1), Row(3, 1), Row(4, 2))
+ )
+
+ val (shuffleExpected, sortExpected) = groupByExpects
+ assert(collectFirst(groupBy.queryExecution.executedPlan) {
+ case e: ShuffleExchangeExec => e
+ }.isDefined === shuffleExpected)
+ assert(collectFirst(groupBy.queryExecution.executedPlan) {
+ case e: SortExec => e
+ }.isDefined === sortExpected)
+ }
+
+ // aggregation function over window partitioned by i and ordered by j
+ {
+ val windowPartByColIOrderByColJ = df.withColumn("no",
+ row_number() over Window.partitionBy(Symbol("i")).orderBy(Symbol("j"))
+ )
+ checkAnswer(windowPartByColIOrderByColJ, Seq(
+ Row(1, 4, 1), Row(1, 5, 2), Row(2, 6, 1), Row(3, 5, 1), Row(4, 1, 1), Row(4, 2, 2)
+ ))
+
+ val (shuffleExpected, sortExpected) = windowFuncExpects
+ assert(collectFirst(windowPartByColIOrderByColJ.queryExecution.executedPlan) {
+ case e: ShuffleExchangeExec => e
+ }.isDefined === shuffleExpected)
+ assert(collectFirst(windowPartByColIOrderByColJ.queryExecution.executedPlan) {
+ case e: SortExec => e
+ }.isDefined === sortExpected)
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
test ("statistics report data source") {
Seq(classOf[ReportStatisticsDataSource], classOf[JavaReportStatisticsDataSource]).foreach {
cls =>
@@ -862,10 +948,10 @@ object ColumnarReaderFactory extends PartitionReaderFactory {
class PartitionAwareDataSource extends TestingV2Source {
class MyScanBuilder extends SimpleScanBuilder
- with SupportsReportPartitioning{
+ with SupportsReportPartitioning {
override def planInputPartitions(): Array[InputPartition] = {
- // Note that we don't have same value of column `a` across partitions.
+ // Note that we don't have same value of column `i` across partitions.
Array(
SpecificInputPartition(Array(1, 1, 3), Array(4, 4, 6)),
SpecificInputPartition(Array(2, 4, 4), Array(6, 2, 2)))
@@ -886,6 +972,68 @@ class PartitionAwareDataSource extends TestingV2Source {
}
}
+class OrderAndPartitionAwareDataSource extends PartitionAwareDataSource {
+
+ class MyScanBuilder(
+ val partitionKeys: Option[Seq[String]],
+ val orderKeys: Seq[String])
+ extends SimpleScanBuilder
+ with SupportsReportPartitioning with SupportsReportOrdering {
+
+ override def planInputPartitions(): Array[InputPartition] = {
+ // data are partitioned by column `i` or `j`, so we can report any partitioning
+ // column `i` is not ordered globally, but within partitions, together with`j`
+ // this allows us to report ordering by [i] and [i, j]
+ Array(
+ SpecificInputPartition(Array(1, 1, 3), Array(4, 5, 5)),
+ SpecificInputPartition(Array(2, 4, 4), Array(6, 1, 2)))
+ }
+
+ override def createReaderFactory(): PartitionReaderFactory = {
+ SpecificReaderFactory
+ }
+
+ override def outputPartitioning(): Partitioning = {
+ partitionKeys.map(keys =>
+ new KeyGroupedPartitioning(keys.map(FieldReference(_)).toArray, 2)
+ ).getOrElse(
+ new UnknownPartitioning(2)
+ )
+ }
+
+ override def outputOrdering(): Array[SortOrder] = orderKeys.map(
+ new MySortOrder(_)
+ ).toArray
+ }
+
+ override def getTable(options: CaseInsensitiveStringMap): Table = new SimpleBatchTable {
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
+ new MyScanBuilder(
+ Option(options.get("partitionKeys")).map(_.split(",")),
+ Option(options.get("orderKeys")).map(_.split(",").toSeq).getOrElse(Seq.empty)
+ )
+ }
+ }
+
+ class MySortOrder(columnName: String) extends SortOrder {
+ override def expression(): Expression = new MyIdentityTransform(
+ new MyNamedReference(columnName)
+ )
+ override def direction(): SortDirection = SortDirection.ASCENDING
+ override def nullOrdering(): NullOrdering = NullOrdering.NULLS_FIRST
+ }
+
+ class MyNamedReference(parts: String*) extends NamedReference {
+ override def fieldNames(): Array[String] = parts.toArray
+ }
+
+ class MyIdentityTransform(namedReference: NamedReference) extends Transform {
+ override def name(): String = "identity"
+ override def references(): Array[NamedReference] = Array.empty
+ override def arguments(): Array[Expression] = Seq(namedReference).toArray
+ }
+}
+
case class SpecificInputPartition(
i: Array[Int],
j: Array[Int]) extends InputPartition with HasPartitionKey {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PrunePartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PrunePartitionSuiteBase.scala
index 775f34f1f61..2d77dc1836f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PrunePartitionSuiteBase.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PrunePartitionSuiteBase.scala
@@ -95,7 +95,7 @@ abstract class PrunePartitionSuiteBase extends StatisticsCollectionTestBase {
assert(getScanExecPartitionSize(plan) == expectedPartitionCount)
val collectFn: PartialFunction[SparkPlan, Seq[Expression]] = collectPartitionFiltersFn orElse {
- case BatchScanExec(_, scan: FileScan, _, _) => scan.partitionFilters
+ case BatchScanExec(_, scan: FileScan, _, _, _) => scan.partitionFilters
}
val pushedDownPartitionFilters = plan.collectFirst(collectFn)
.map(exps => exps.filterNot(e => e.isInstanceOf[IsNotNull]))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
index 71e8e8f79f2..aa0051a54af 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
@@ -59,7 +59,7 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession {
.where(Column(predicate))
query.queryExecution.optimizedPlan match {
- case PhysicalOperation(_, filters, DataSourceV2ScanRelation(_, o: OrcScan, _, _)) =>
+ case PhysicalOperation(_, filters, DataSourceV2ScanRelation(_, o: OrcScan, _, _, _)) =>
assert(filters.nonEmpty, "No filter is analyzed from the given query")
assert(o.pushedFilters.nonEmpty, "No filter is pushed down")
val maybeFilter = OrcFilters.createFilter(query.schema, o.pushedFilters)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
index 2b3c7b3c555..c8c823b2018 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
@@ -120,7 +120,7 @@ trait OrcTest extends QueryTest with FileBasedDataSourceTest with BeforeAndAfter
.where(Column(predicate))
query.queryExecution.optimizedPlan match {
- case PhysicalOperation(_, filters, DataSourceV2ScanRelation(_, o: OrcScan, _, _)) =>
+ case PhysicalOperation(_, filters, DataSourceV2ScanRelation(_, o: OrcScan, _, _, _)) =>
assert(filters.nonEmpty, "No filter is analyzed from the given query")
if (noneSupported) {
assert(o.pushedFilters.isEmpty, "Unsupported filters should not show in pushed filters")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV2SchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV2SchemaPruningSuite.scala
index 7fb6d4c3696..8e6793fe078 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV2SchemaPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV2SchemaPruningSuite.scala
@@ -40,7 +40,7 @@ class OrcV2SchemaPruningSuite extends SchemaPruningSuite with AdaptiveSparkPlanH
override def checkScanSchemata(df: DataFrame, expectedSchemaCatalogStrings: String*): Unit = {
val fileSourceScanSchemata =
collect(df.queryExecution.executedPlan) {
- case BatchScanExec(_, scan: OrcScan, _, _) => scan.readDataSchema
+ case BatchScanExec(_, scan: OrcScan, _, _, _) => scan.readDataSchema
}
assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 34bfc7d2c9c..3df99e9d7ea 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -2155,7 +2155,7 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
query.queryExecution.optimizedPlan.collectFirst {
case PhysicalOperation(_, filters,
- DataSourceV2ScanRelation(_, scan: ParquetScan, _, _)) =>
+ DataSourceV2ScanRelation(_, scan: ParquetScan, _, _, _)) =>
assert(filters.nonEmpty, "No filter is analyzed from the given query")
val sourceFilters = filters.flatMap(DataSourceStrategy.translateFilter(_, true)).toArray
val pushedFilters = scan.pushedFilters
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org