You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/02/20 08:04:28 UTC
spark git commit: [SPARK-23203][SQL] DataSourceV2: Use immutable
logical plans.
Repository: spark
Updated Branches:
refs/heads/master 651b0277f -> aadf9535b
[SPARK-23203][SQL] DataSourceV2: Use immutable logical plans.
## What changes were proposed in this pull request?
SPARK-23203: DataSourceV2 should use immutable catalyst trees instead of wrapping a mutable DataSourceV2Reader. This commit updates DataSourceV2Relation and consolidates much of the DataSourceV2 API requirements for the read path in it. Instead of wrapping a reader that changes, the relation lazily produces a reader from its configuration.
This commit also updates the predicate and projection push-down. Instead of the implementation from SPARK-22197, this reuses the rule matching from the Hive and DataSource read paths (using `PhysicalOperation`) and copies most of the implementation of `SparkPlanner.pruneFilterProject`, with updates for DataSourceV2. By reusing the implementation from other read paths, this should have fewer regressions from other read paths and is less code to maintain.
The new push-down rules also supports the following edge cases:
* The output of DataSourceV2Relation should be what is returned by the reader, in case the reader can only partially satisfy the requested schema projection
* The requested projection passed to the DataSourceV2Reader should include filter columns
* The push-down rule may be run more than once if filters are not pushed through projections
## How was this patch tested?
Existing push-down and read tests.
Author: Ryan Blue <bl...@apache.org>
Closes #20387 from rdblue/SPARK-22386-push-down-immutable-trees.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aadf9535
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aadf9535
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aadf9535
Branch: refs/heads/master
Commit: aadf9535b4a11b42fd9d72f636576d2da0766199
Parents: 651b027
Author: Ryan Blue <bl...@apache.org>
Authored: Tue Feb 20 16:04:22 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Feb 20 16:04:22 2018 +0800
----------------------------------------------------------------------
.../kafka010/KafkaContinuousSourceSuite.scala | 19 +-
.../sql/kafka010/KafkaContinuousTest.scala | 4 +-
.../kafka010/KafkaMicroBatchSourceSuite.scala | 4 +-
.../org/apache/spark/sql/DataFrameReader.scala | 41 +---
.../datasources/v2/DataSourceV2Relation.scala | 212 +++++++++++++++++--
.../datasources/v2/DataSourceV2Strategy.scala | 7 +-
.../v2/PushDownOperatorsToDataSource.scala | 159 ++++----------
.../continuous/ContinuousExecution.scala | 2 +-
.../sql/sources/v2/DataSourceV2Suite.scala | 2 +-
.../apache/spark/sql/streaming/StreamTest.scala | 6 +-
10 files changed, 269 insertions(+), 187 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/aadf9535/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
index a7083fa..f679e9b 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
@@ -17,20 +17,9 @@
package org.apache.spark.sql.kafka010
-import java.util.Properties
-import java.util.concurrent.atomic.AtomicInteger
-
-import org.scalatest.time.SpanSugar._
-import scala.collection.mutable
-import scala.util.Random
-
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row}
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.execution.streaming.StreamExecution
-import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
-import org.apache.spark.sql.streaming.{StreamTest, Trigger}
-import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
+import org.apache.spark.sql.streaming.Trigger
// Run tests in KafkaSourceSuiteBase in continuous execution mode.
class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuousTest
@@ -71,7 +60,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
eventually(timeout(streamingTimeout)) {
assert(
query.lastExecution.logical.collectFirst {
- case DataSourceV2Relation(_, r: KafkaContinuousReader) => r
+ case StreamingDataSourceV2Relation(_, r: KafkaContinuousReader) => r
}.exists { r =>
// Ensure the new topic is present and the old topic is gone.
r.knownPartitions.exists(_.topic == topic2)
http://git-wip-us.apache.org/repos/asf/spark/blob/aadf9535/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
index 5a1a14f..48ac3fc 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd, SparkListenerTaskStart}
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
import org.apache.spark.sql.streaming.Trigger
@@ -47,7 +47,7 @@ trait KafkaContinuousTest extends KafkaSourceTest {
eventually(timeout(streamingTimeout)) {
assert(
query.lastExecution.logical.collectFirst {
- case DataSourceV2Relation(_, r: KafkaContinuousReader) => r
+ case StreamingDataSourceV2Relation(_, r: KafkaContinuousReader) => r
}.exists(_.knownPartitions.size == newCount),
s"query never reconfigured to $newCount partitions")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/aadf9535/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index ed4ecfe..89c9ef4 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -35,7 +35,7 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkContext
import org.apache.spark.sql.{Dataset, ForeachWriter}
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
import org.apache.spark.sql.functions.{count, window}
@@ -119,7 +119,7 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
} ++ (query.get.lastExecution match {
case null => Seq()
case e => e.logical.collect {
- case DataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
+ case StreamingDataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
}
})
}.distinct
http://git-wip-us.apache.org/repos/asf/spark/blob/aadf9535/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index fcaf8d6..4274f12 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.datasources.jdbc._
import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
-import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.{DataSourceV2, ReadSupport, ReadSupportWithSchema}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.unsafe.types.UTF8String
@@ -189,39 +189,16 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
- val ds = cls.newInstance()
- val options = new DataSourceOptions((extraOptions ++
- DataSourceV2Utils.extractSessionConfigs(
- ds = ds.asInstanceOf[DataSourceV2],
- conf = sparkSession.sessionState.conf)).asJava)
-
- // Streaming also uses the data source V2 API. So it may be that the data source implements
- // v2, but has no v2 implementation for batch reads. In that case, we fall back to loading
- // the dataframe as a v1 source.
- val reader = (ds, userSpecifiedSchema) match {
- case (ds: ReadSupportWithSchema, Some(schema)) =>
- ds.createReader(schema, options)
-
- case (ds: ReadSupport, None) =>
- ds.createReader(options)
-
- case (ds: ReadSupportWithSchema, None) =>
- throw new AnalysisException(s"A schema needs to be specified when using $ds.")
-
- case (ds: ReadSupport, Some(schema)) =>
- val reader = ds.createReader(options)
- if (reader.readSchema() != schema) {
- throw new AnalysisException(s"$ds does not allow user-specified schemas.")
- }
- reader
-
- case _ => null // fall back to v1
- }
+ val ds = cls.newInstance().asInstanceOf[DataSourceV2]
+ if (ds.isInstanceOf[ReadSupport] || ds.isInstanceOf[ReadSupportWithSchema]) {
+ val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+ ds = ds, conf = sparkSession.sessionState.conf)
+ Dataset.ofRows(sparkSession, DataSourceV2Relation.create(
+ ds, extraOptions.toMap ++ sessionOptions,
+ userSpecifiedSchema = userSpecifiedSchema))
- if (reader == null) {
- loadV1Source(paths: _*)
} else {
- Dataset.ofRows(sparkSession, DataSourceV2Relation(reader))
+ loadV1Source(paths: _*)
}
} else {
loadV1Source(paths: _*)
http://git-wip-us.apache.org/repos/asf/spark/blob/aadf9535/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 38f6b15..a98dd48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -17,17 +17,80 @@
package org.apache.spark.sql.execution.datasources.v2
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.types.StructType
case class DataSourceV2Relation(
- output: Seq[AttributeReference],
- reader: DataSourceReader)
- extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+ source: DataSourceV2,
+ options: Map[String, String],
+ projection: Seq[AttributeReference],
+ filters: Option[Seq[Expression]] = None,
+ userSpecifiedSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation {
+
+ import DataSourceV2Relation._
+
+ override def simpleString: String = {
+ s"DataSourceV2Relation(source=${source.name}, " +
+ s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " +
+ s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+ }
+
+ override lazy val schema: StructType = reader.readSchema()
+
+ override lazy val output: Seq[AttributeReference] = {
+ // use the projection attributes to avoid assigning new ids. fields that are not projected
+ // will be assigned new ids, which is okay because they are not projected.
+ val attrMap = projection.map(a => a.name -> a).toMap
+ schema.map(f => attrMap.getOrElse(f.name,
+ AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()))
+ }
+
+ private lazy val v2Options: DataSourceOptions = makeV2Options(options)
+
+ lazy val (
+ reader: DataSourceReader,
+ unsupportedFilters: Seq[Expression],
+ pushedFilters: Seq[Expression]) = {
+ val newReader = userSpecifiedSchema match {
+ case Some(s) =>
+ source.asReadSupportWithSchema.createReader(s, v2Options)
+ case _ =>
+ source.asReadSupport.createReader(v2Options)
+ }
+
+ DataSourceV2Relation.pushRequiredColumns(newReader, projection.toStructType)
- override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation]
+ val (remainingFilters, pushedFilters) = filters match {
+ case Some(filterSeq) =>
+ DataSourceV2Relation.pushFilters(newReader, filterSeq)
+ case _ =>
+ (Nil, Nil)
+ }
+
+ (newReader, remainingFilters, pushedFilters)
+ }
+
+ override def doCanonicalize(): LogicalPlan = {
+ val c = super.doCanonicalize().asInstanceOf[DataSourceV2Relation]
+
+ // override output with canonicalized output to avoid attempting to configure a reader
+ val canonicalOutput: Seq[AttributeReference] = this.output
+ .map(a => QueryPlan.normalizeExprId(a, projection))
+
+ new DataSourceV2Relation(c.source, c.options, c.projection) {
+ override lazy val output: Seq[AttributeReference] = canonicalOutput
+ }
+ }
override def computeStats(): Statistics = reader match {
case r: SupportsReportStatistics =>
@@ -37,7 +100,9 @@ case class DataSourceV2Relation(
}
override def newInstance(): DataSourceV2Relation = {
- copy(output = output.map(_.newInstance()))
+ // projection is used to maintain id assignment.
+ // if projection is not set, use output so the copy is not equal to the original
+ copy(projection = projection.map(_.newInstance()))
}
}
@@ -45,14 +110,137 @@ case class DataSourceV2Relation(
* A specialization of DataSourceV2Relation with the streaming bit set to true. Otherwise identical
* to the non-streaming relation.
*/
-class StreamingDataSourceV2Relation(
+case class StreamingDataSourceV2Relation(
output: Seq[AttributeReference],
- reader: DataSourceReader) extends DataSourceV2Relation(output, reader) {
+ reader: DataSourceReader)
+ extends LeafNode with DataSourceReaderHolder with MultiInstanceRelation {
override def isStreaming: Boolean = true
+
+ override def canEqual(other: Any): Boolean = other.isInstanceOf[StreamingDataSourceV2Relation]
+
+ override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance()))
+
+ override def computeStats(): Statistics = reader match {
+ case r: SupportsReportStatistics =>
+ Statistics(sizeInBytes = r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
+ case _ =>
+ Statistics(sizeInBytes = conf.defaultSizeInBytes)
+ }
}
object DataSourceV2Relation {
- def apply(reader: DataSourceReader): DataSourceV2Relation = {
- new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
+ private implicit class SourceHelpers(source: DataSourceV2) {
+ def asReadSupport: ReadSupport = {
+ source match {
+ case support: ReadSupport =>
+ support
+ case _: ReadSupportWithSchema =>
+ // this method is only called if there is no user-supplied schema. if there is no
+ // user-supplied schema and ReadSupport was not implemented, throw a helpful exception.
+ throw new AnalysisException(s"Data source requires a user-supplied schema: $name")
+ case _ =>
+ throw new AnalysisException(s"Data source is not readable: $name")
+ }
+ }
+
+ def asReadSupportWithSchema: ReadSupportWithSchema = {
+ source match {
+ case support: ReadSupportWithSchema =>
+ support
+ case _: ReadSupport =>
+ throw new AnalysisException(
+ s"Data source does not support user-supplied schema: $name")
+ case _ =>
+ throw new AnalysisException(s"Data source is not readable: $name")
+ }
+ }
+
+ def name: String = {
+ source match {
+ case registered: DataSourceRegister =>
+ registered.shortName()
+ case _ =>
+ source.getClass.getSimpleName
+ }
+ }
+ }
+
+ private def makeV2Options(options: Map[String, String]): DataSourceOptions = {
+ new DataSourceOptions(options.asJava)
+ }
+
+ private def schema(
+ source: DataSourceV2,
+ v2Options: DataSourceOptions,
+ userSchema: Option[StructType]): StructType = {
+ val reader = userSchema match {
+ // TODO: remove this case because it is confusing for users
+ case Some(s) if !source.isInstanceOf[ReadSupportWithSchema] =>
+ val reader = source.asReadSupport.createReader(v2Options)
+ if (reader.readSchema() != s) {
+ throw new AnalysisException(s"${source.name} does not allow user-specified schemas.")
+ }
+ reader
+ case Some(s) =>
+ source.asReadSupportWithSchema.createReader(s, v2Options)
+ case _ =>
+ source.asReadSupport.createReader(v2Options)
+ }
+ reader.readSchema()
+ }
+
+ def create(
+ source: DataSourceV2,
+ options: Map[String, String],
+ filters: Option[Seq[Expression]] = None,
+ userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {
+ val projection = schema(source, makeV2Options(options), userSpecifiedSchema).toAttributes
+ DataSourceV2Relation(source, options, projection, filters,
+ // if the source does not implement ReadSupportWithSchema, then the userSpecifiedSchema must
+ // be equal to the reader's schema. the schema method enforces this. because the user schema
+ // and the reader's schema are identical, drop the user schema.
+ if (source.isInstanceOf[ReadSupportWithSchema]) userSpecifiedSchema else None)
+ }
+
+ private def pushRequiredColumns(reader: DataSourceReader, struct: StructType): Unit = {
+ reader match {
+ case projectionSupport: SupportsPushDownRequiredColumns =>
+ projectionSupport.pruneColumns(struct)
+ case _ =>
+ }
+ }
+
+ private def pushFilters(
+ reader: DataSourceReader,
+ filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
+ reader match {
+ case catalystFilterSupport: SupportsPushDownCatalystFilters =>
+ (
+ catalystFilterSupport.pushCatalystFilters(filters.toArray),
+ catalystFilterSupport.pushedCatalystFilters()
+ )
+
+ case filterSupport: SupportsPushDownFilters =>
+ // A map from original Catalyst expressions to corresponding translated data source
+ // filters. If a predicate is not in this map, it means it cannot be pushed down.
+ val translatedMap: Map[Expression, Filter] = filters.flatMap { p =>
+ DataSourceStrategy.translateFilter(p).map(f => p -> f)
+ }.toMap
+
+ // Catalyst predicate expressions that cannot be converted to data source filters.
+ val nonConvertiblePredicates = filters.filterNot(translatedMap.contains)
+
+ // Data source filters that cannot be pushed down. An unhandled filter means
+ // the data source cannot guarantee the rows returned can pass the filter.
+ // As a result we must return it so Spark can plan an extra filter operator.
+ val unhandledFilters = filterSupport.pushFilters(translatedMap.values.toArray).toSet
+ val (unhandledPredicates, pushedPredicates) = translatedMap.partition { case (_, f) =>
+ unhandledFilters.contains(f)
+ }
+
+ (nonConvertiblePredicates ++ unhandledPredicates.keys, pushedPredicates.keys.toSeq)
+
+ case _ => (filters, Nil)
+ }
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/aadf9535/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index df5b524..c4e7644 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -23,8 +23,11 @@ import org.apache.spark.sql.execution.SparkPlan
object DataSourceV2Strategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case DataSourceV2Relation(output, reader) =>
- DataSourceV2ScanExec(output, reader) :: Nil
+ case relation: DataSourceV2Relation =>
+ DataSourceV2ScanExec(relation.output, relation.reader) :: Nil
+
+ case relation: StreamingDataSourceV2Relation =>
+ DataSourceV2ScanExec(relation.output, relation.reader) :: Nil
case WriteToDataSourceV2(writer, query) =>
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil
http://git-wip-us.apache.org/repos/asf/spark/blob/aadf9535/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
index 1ca6cbf..f23d228 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
@@ -17,130 +17,55 @@
package org.apache.spark.sql.execution.datasources.v2
-import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeMap, AttributeSet, Expression, NamedExpression, PredicateHelper}
-import org.apache.spark.sql.catalyst.optimizer.RemoveRedundantProject
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
-import org.apache.spark.sql.sources
-import org.apache.spark.sql.sources.v2.reader._
-/**
- * Pushes down various operators to the underlying data source for better performance. Operators are
- * being pushed down with a specific order. As an example, given a LIMIT has a FILTER child, you
- * can't push down LIMIT if FILTER is not completely pushed down. When both are pushed down, the
- * data source should execute FILTER before LIMIT. And required columns are calculated at the end,
- * because when more operators are pushed down, we may need less columns at Spark side.
- */
-object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with PredicateHelper {
- override def apply(plan: LogicalPlan): LogicalPlan = {
- // Note that, we need to collect the target operator along with PROJECT node, as PROJECT may
- // appear in many places for column pruning.
- // TODO: Ideally column pruning should be implemented via a plan property that is propagated
- // top-down, then we can simplify the logic here and only collect target operators.
- val filterPushed = plan transformUp {
- case FilterAndProject(fields, condition, r @ DataSourceV2Relation(_, reader)) =>
- val (candidates, nonDeterministic) =
- splitConjunctivePredicates(condition).partition(_.deterministic)
-
- val stayUpFilters: Seq[Expression] = reader match {
- case r: SupportsPushDownCatalystFilters =>
- r.pushCatalystFilters(candidates.toArray)
-
- case r: SupportsPushDownFilters =>
- // A map from original Catalyst expressions to corresponding translated data source
- // filters. If a predicate is not in this map, it means it cannot be pushed down.
- val translatedMap: Map[Expression, sources.Filter] = candidates.flatMap { p =>
- DataSourceStrategy.translateFilter(p).map(f => p -> f)
- }.toMap
-
- // Catalyst predicate expressions that cannot be converted to data source filters.
- val nonConvertiblePredicates = candidates.filterNot(translatedMap.contains)
-
- // Data source filters that cannot be pushed down. An unhandled filter means
- // the data source cannot guarantee the rows returned can pass the filter.
- // As a result we must return it so Spark can plan an extra filter operator.
- val unhandledFilters = r.pushFilters(translatedMap.values.toArray).toSet
- val unhandledPredicates = translatedMap.filter { case (_, f) =>
- unhandledFilters.contains(f)
- }.keys
-
- nonConvertiblePredicates ++ unhandledPredicates
-
- case _ => candidates
- }
-
- val filterCondition = (stayUpFilters ++ nonDeterministic).reduceLeftOption(And)
- val withFilter = filterCondition.map(Filter(_, r)).getOrElse(r)
- if (withFilter.output == fields) {
- withFilter
- } else {
- Project(fields, withFilter)
- }
- }
-
- // TODO: add more push down rules.
-
- val columnPruned = pushDownRequiredColumns(filterPushed, filterPushed.outputSet)
- // After column pruning, we may have redundant PROJECT nodes in the query plan, remove them.
- RemoveRedundantProject(columnPruned)
- }
-
- // TODO: nested fields pruning
- private def pushDownRequiredColumns(
- plan: LogicalPlan, requiredByParent: AttributeSet): LogicalPlan = {
- plan match {
- case p @ Project(projectList, child) =>
- val required = projectList.flatMap(_.references)
- p.copy(child = pushDownRequiredColumns(child, AttributeSet(required)))
-
- case f @ Filter(condition, child) =>
- val required = requiredByParent ++ condition.references
- f.copy(child = pushDownRequiredColumns(child, required))
-
- case relation: DataSourceV2Relation => relation.reader match {
- case reader: SupportsPushDownRequiredColumns =>
- // TODO: Enable the below assert after we make `DataSourceV2Relation` immutable. Fow now
- // it's possible that the mutable reader being updated by someone else, and we need to
- // always call `reader.pruneColumns` here to correct it.
- // assert(relation.output.toStructType == reader.readSchema(),
- // "Schema of data source reader does not match the relation plan.")
-
- val requiredColumns = relation.output.filter(requiredByParent.contains)
- reader.pruneColumns(requiredColumns.toStructType)
-
- val nameToAttr = relation.output.map(_.name).zip(relation.output).toMap
- val newOutput = reader.readSchema().map(_.name).map(nameToAttr)
- relation.copy(output = newOutput)
-
- case _ => relation
+object PushDownOperatorsToDataSource extends Rule[LogicalPlan] {
+ override def apply(
+ plan: LogicalPlan): LogicalPlan = plan transformUp {
+ // PhysicalOperation guarantees that filters are deterministic; no need to check
+ case PhysicalOperation(project, newFilters, relation : DataSourceV2Relation) =>
+ // merge the filters
+ val filters = relation.filters match {
+ case Some(existing) =>
+ existing ++ newFilters
+ case _ =>
+ newFilters
}
- // TODO: there may be more operators that can be used to calculate the required columns. We
- // can add more and more in the future.
- case _ => plan.mapChildren(c => pushDownRequiredColumns(c, c.outputSet))
- }
- }
-
- /**
- * Finds a Filter node(with an optional Project child) above data source relation.
- */
- object FilterAndProject {
- // returns the project list, the filter condition and the data source relation.
- def unapply(plan: LogicalPlan)
- : Option[(Seq[NamedExpression], Expression, DataSourceV2Relation)] = plan match {
+ val projectAttrs = project.map(_.toAttribute)
+ val projectSet = AttributeSet(project.flatMap(_.references))
+ val filterSet = AttributeSet(filters.flatMap(_.references))
+
+ val projection = if (filterSet.subsetOf(projectSet) &&
+ AttributeSet(projectAttrs) == projectSet) {
+ // When the required projection contains all of the filter columns and column pruning alone
+ // can produce the required projection, push the required projection.
+ // A final projection may still be needed if the data source produces a different column
+ // order or if it cannot prune all of the nested columns.
+ projectAttrs
+ } else {
+ // When there are filter columns not already in the required projection or when the required
+ // projection is more complicated than column pruning, base column pruning on the set of
+ // all columns needed by both.
+ (projectSet ++ filterSet).toSeq
+ }
- case Filter(condition, r: DataSourceV2Relation) => Some((r.output, condition, r))
+ val newRelation = relation.copy(
+ projection = projection.asInstanceOf[Seq[AttributeReference]],
+ filters = Some(filters))
- case Filter(condition, Project(fields, r: DataSourceV2Relation))
- if fields.forall(_.deterministic) =>
- val attributeMap = AttributeMap(fields.map(e => e.toAttribute -> e))
- val substituted = condition.transform {
- case a: Attribute => attributeMap.getOrElse(a, a)
- }
- Some((fields, substituted, r))
+ // Add a Filter for any filters that could not be pushed
+ val unpushedFilter = newRelation.unsupportedFilters.reduceLeftOption(And)
+ val filtered = unpushedFilter.map(Filter(_, newRelation)).getOrElse(newRelation)
- case _ => None
- }
+ // Add a Project to ensure the output matches the required projection
+ if (newRelation.output != projectAttrs) {
+ Project(project, filtered)
+ } else {
+ filtered
+ }
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/aadf9535/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index c3294d6..2c1d6c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -201,7 +201,7 @@ class ContinuousExecution(
val withSink = WriteToDataSourceV2(writer, triggerLogicalPlan)
val reader = withSink.collect {
- case DataSourceV2Relation(_, r: ContinuousReader) => r
+ case StreamingDataSourceV2Relation(_, r: ContinuousReader) => r
}.head
reportTimeTaken("queryPlanning") {
http://git-wip-us.apache.org/repos/asf/spark/blob/aadf9535/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index a1c87fb..1157a35 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -146,7 +146,7 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
Seq(classOf[SchemaRequiredDataSource], classOf[JavaSchemaRequiredDataSource]).foreach { cls =>
withClue(cls.getName) {
val e = intercept[AnalysisException](spark.read.format(cls.getName).load())
- assert(e.message.contains("A schema needs to be specified"))
+ assert(e.message.contains("requires a user-supplied schema"))
val schema = new StructType().add("i", "int").add("s", "string")
val df = spark.read.format(cls.getName).schema(schema).load()
http://git-wip-us.apache.org/repos/asf/spark/blob/aadf9535/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 37fe595..159dd0e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -38,9 +38,9 @@ import org.apache.spark.sql.{Dataset, Encoder, QueryTest, Row}
import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, ContinuousTrigger, EpochCoordinatorRef, IncrementAndGetEpoch}
+import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, EpochCoordinatorRef, IncrementAndGetEpoch}
import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2
import org.apache.spark.sql.execution.streaming.state.StateStore
import org.apache.spark.sql.streaming.StreamingQueryListener._
@@ -605,7 +605,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
plan
.collect {
case StreamingExecutionRelation(s, _) => s
- case DataSourceV2Relation(_, r) => r
+ case StreamingDataSourceV2Relation(_, r) => r
}
.zipWithIndex
.find(_._1 == source)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org