You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/02/20 08:04:28 UTC

spark git commit: [SPARK-23203][SQL] DataSourceV2: Use immutable logical plans.

Repository: spark
Updated Branches:
  refs/heads/master 651b0277f -> aadf9535b


[SPARK-23203][SQL] DataSourceV2: Use immutable logical plans.

## What changes were proposed in this pull request?

SPARK-23203: DataSourceV2 should use immutable catalyst trees instead of wrapping a mutable DataSourceV2Reader. This commit updates DataSourceV2Relation and consolidates much of the DataSourceV2 API requirements for the read path in it. Instead of wrapping a reader that changes, the relation lazily produces a reader from its configuration.

This commit also updates the predicate and projection push-down. Instead of the implementation from SPARK-22197, this reuses the rule matching from the Hive and DataSource read paths (using `PhysicalOperation`) and copies most of the implementation of `SparkPlanner.pruneFilterProject`, with updates for DataSourceV2. By reusing the implementation from other read paths, this should have fewer regressions from other read paths and is less code to maintain.

The new push-down rules also supports the following edge cases:

* The output of DataSourceV2Relation should be what is returned by the reader, in case the reader can only partially satisfy the requested schema projection
* The requested projection passed to the DataSourceV2Reader should include filter columns
* The push-down rule may be run more than once if filters are not pushed through projections

## How was this patch tested?

Existing push-down and read tests.

Author: Ryan Blue <bl...@apache.org>

Closes #20387 from rdblue/SPARK-22386-push-down-immutable-trees.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aadf9535
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aadf9535
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aadf9535

Branch: refs/heads/master
Commit: aadf9535b4a11b42fd9d72f636576d2da0766199
Parents: 651b027
Author: Ryan Blue <bl...@apache.org>
Authored: Tue Feb 20 16:04:22 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Feb 20 16:04:22 2018 +0800

----------------------------------------------------------------------
 .../kafka010/KafkaContinuousSourceSuite.scala   |  19 +-
 .../sql/kafka010/KafkaContinuousTest.scala      |   4 +-
 .../kafka010/KafkaMicroBatchSourceSuite.scala   |   4 +-
 .../org/apache/spark/sql/DataFrameReader.scala  |  41 +---
 .../datasources/v2/DataSourceV2Relation.scala   | 212 +++++++++++++++++--
 .../datasources/v2/DataSourceV2Strategy.scala   |   7 +-
 .../v2/PushDownOperatorsToDataSource.scala      | 159 ++++----------
 .../continuous/ContinuousExecution.scala        |   2 +-
 .../sql/sources/v2/DataSourceV2Suite.scala      |   2 +-
 .../apache/spark/sql/streaming/StreamTest.scala |   6 +-
 10 files changed, 269 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/aadf9535/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
index a7083fa..f679e9b 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
@@ -17,20 +17,9 @@
 
 package org.apache.spark.sql.kafka010
 
-import java.util.Properties
-import java.util.concurrent.atomic.AtomicInteger
-
-import org.scalatest.time.SpanSugar._
-import scala.collection.mutable
-import scala.util.Random
-
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row}
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.execution.streaming.StreamExecution
-import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
-import org.apache.spark.sql.streaming.{StreamTest, Trigger}
-import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
+import org.apache.spark.sql.streaming.Trigger
 
 // Run tests in KafkaSourceSuiteBase in continuous execution mode.
 class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuousTest
@@ -71,7 +60,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
         eventually(timeout(streamingTimeout)) {
           assert(
             query.lastExecution.logical.collectFirst {
-              case DataSourceV2Relation(_, r: KafkaContinuousReader) => r
+              case StreamingDataSourceV2Relation(_, r: KafkaContinuousReader) => r
             }.exists { r =>
               // Ensure the new topic is present and the old topic is gone.
               r.knownPartitions.exists(_.topic == topic2)

http://git-wip-us.apache.org/repos/asf/spark/blob/aadf9535/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
index 5a1a14f..48ac3fc 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.spark.SparkContext
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd, SparkListenerTaskStart}
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
 import org.apache.spark.sql.execution.streaming.StreamExecution
 import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
 import org.apache.spark.sql.streaming.Trigger
@@ -47,7 +47,7 @@ trait KafkaContinuousTest extends KafkaSourceTest {
     eventually(timeout(streamingTimeout)) {
       assert(
         query.lastExecution.logical.collectFirst {
-          case DataSourceV2Relation(_, r: KafkaContinuousReader) => r
+          case StreamingDataSourceV2Relation(_, r: KafkaContinuousReader) => r
         }.exists(_.knownPartitions.size == newCount),
         s"query never reconfigured to $newCount partitions")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/aadf9535/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index ed4ecfe..89c9ef4 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -35,7 +35,7 @@ import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.{Dataset, ForeachWriter}
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
 import org.apache.spark.sql.functions.{count, window}
@@ -119,7 +119,7 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
         } ++ (query.get.lastExecution match {
           case null => Seq()
           case e => e.logical.collect {
-            case DataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
+            case StreamingDataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
           }
         })
       }.distinct

http://git-wip-us.apache.org/repos/asf/spark/blob/aadf9535/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index fcaf8d6..4274f12 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.datasources.jdbc._
 import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
-import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.{DataSourceV2, ReadSupport, ReadSupportWithSchema}
 import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -189,39 +189,16 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
 
     val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
     if (classOf[DataSourceV2].isAssignableFrom(cls)) {
-      val ds = cls.newInstance()
-      val options = new DataSourceOptions((extraOptions ++
-        DataSourceV2Utils.extractSessionConfigs(
-          ds = ds.asInstanceOf[DataSourceV2],
-          conf = sparkSession.sessionState.conf)).asJava)
-
-      // Streaming also uses the data source V2 API. So it may be that the data source implements
-      // v2, but has no v2 implementation for batch reads. In that case, we fall back to loading
-      // the dataframe as a v1 source.
-      val reader = (ds, userSpecifiedSchema) match {
-        case (ds: ReadSupportWithSchema, Some(schema)) =>
-          ds.createReader(schema, options)
-
-        case (ds: ReadSupport, None) =>
-          ds.createReader(options)
-
-        case (ds: ReadSupportWithSchema, None) =>
-          throw new AnalysisException(s"A schema needs to be specified when using $ds.")
-
-        case (ds: ReadSupport, Some(schema)) =>
-          val reader = ds.createReader(options)
-          if (reader.readSchema() != schema) {
-            throw new AnalysisException(s"$ds does not allow user-specified schemas.")
-          }
-          reader
-
-        case _ => null // fall back to v1
-      }
+      val ds = cls.newInstance().asInstanceOf[DataSourceV2]
+      if (ds.isInstanceOf[ReadSupport] || ds.isInstanceOf[ReadSupportWithSchema]) {
+        val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+          ds = ds, conf = sparkSession.sessionState.conf)
+        Dataset.ofRows(sparkSession, DataSourceV2Relation.create(
+          ds, extraOptions.toMap ++ sessionOptions,
+          userSpecifiedSchema = userSpecifiedSchema))
 
-      if (reader == null) {
-        loadV1Source(paths: _*)
       } else {
-        Dataset.ofRows(sparkSession, DataSourceV2Relation(reader))
+        loadV1Source(paths: _*)
       }
     } else {
       loadV1Source(paths: _*)

http://git-wip-us.apache.org/repos/asf/spark/blob/aadf9535/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 38f6b15..a98dd48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -17,17 +17,80 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-    output: Seq[AttributeReference],
-    reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+    source: DataSourceV2,
+    options: Map[String, String],
+    projection: Seq[AttributeReference],
+    filters: Option[Seq[Expression]] = None,
+    userSpecifiedSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation {
+
+  import DataSourceV2Relation._
+
+  override def simpleString: String = {
+    s"DataSourceV2Relation(source=${source.name}, " +
+      s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " +
+      s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+    // use the projection attributes to avoid assigning new ids. fields that are not projected
+    // will be assigned new ids, which is okay because they are not projected.
+    val attrMap = projection.map(a => a.name -> a).toMap
+    schema.map(f => attrMap.getOrElse(f.name,
+      AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()))
+  }
+
+  private lazy val v2Options: DataSourceOptions = makeV2Options(options)
+
+  lazy val (
+      reader: DataSourceReader,
+      unsupportedFilters: Seq[Expression],
+      pushedFilters: Seq[Expression]) = {
+    val newReader = userSpecifiedSchema match {
+      case Some(s) =>
+        source.asReadSupportWithSchema.createReader(s, v2Options)
+      case _ =>
+        source.asReadSupport.createReader(v2Options)
+    }
+
+    DataSourceV2Relation.pushRequiredColumns(newReader, projection.toStructType)
 
-  override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation]
+    val (remainingFilters, pushedFilters) = filters match {
+      case Some(filterSeq) =>
+        DataSourceV2Relation.pushFilters(newReader, filterSeq)
+      case _ =>
+        (Nil, Nil)
+    }
+
+    (newReader, remainingFilters, pushedFilters)
+  }
+
+  override def doCanonicalize(): LogicalPlan = {
+    val c = super.doCanonicalize().asInstanceOf[DataSourceV2Relation]
+
+    // override output with canonicalized output to avoid attempting to configure a reader
+    val canonicalOutput: Seq[AttributeReference] = this.output
+        .map(a => QueryPlan.normalizeExprId(a, projection))
+
+    new DataSourceV2Relation(c.source, c.options, c.projection) {
+      override lazy val output: Seq[AttributeReference] = canonicalOutput
+    }
+  }
 
   override def computeStats(): Statistics = reader match {
     case r: SupportsReportStatistics =>
@@ -37,7 +100,9 @@ case class DataSourceV2Relation(
   }
 
   override def newInstance(): DataSourceV2Relation = {
-    copy(output = output.map(_.newInstance()))
+    // projection is used to maintain id assignment.
+    // if projection is not set, use output so the copy is not equal to the original
+    copy(projection = projection.map(_.newInstance()))
   }
 }
 
@@ -45,14 +110,137 @@ case class DataSourceV2Relation(
  * A specialization of DataSourceV2Relation with the streaming bit set to true. Otherwise identical
  * to the non-streaming relation.
  */
-class StreamingDataSourceV2Relation(
+case class StreamingDataSourceV2Relation(
     output: Seq[AttributeReference],
-    reader: DataSourceReader) extends DataSourceV2Relation(output, reader) {
+    reader: DataSourceReader)
+    extends LeafNode with DataSourceReaderHolder with MultiInstanceRelation {
   override def isStreaming: Boolean = true
+
+  override def canEqual(other: Any): Boolean = other.isInstanceOf[StreamingDataSourceV2Relation]
+
+  override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance()))
+
+  override def computeStats(): Statistics = reader match {
+    case r: SupportsReportStatistics =>
+      Statistics(sizeInBytes = r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
+    case _ =>
+      Statistics(sizeInBytes = conf.defaultSizeInBytes)
+  }
 }
 
 object DataSourceV2Relation {
-  def apply(reader: DataSourceReader): DataSourceV2Relation = {
-    new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
+  private implicit class SourceHelpers(source: DataSourceV2) {
+    def asReadSupport: ReadSupport = {
+      source match {
+        case support: ReadSupport =>
+          support
+        case _: ReadSupportWithSchema =>
+          // this method is only called if there is no user-supplied schema. if there is no
+          // user-supplied schema and ReadSupport was not implemented, throw a helpful exception.
+          throw new AnalysisException(s"Data source requires a user-supplied schema: $name")
+        case _ =>
+          throw new AnalysisException(s"Data source is not readable: $name")
+      }
+    }
+
+    def asReadSupportWithSchema: ReadSupportWithSchema = {
+      source match {
+        case support: ReadSupportWithSchema =>
+          support
+        case _: ReadSupport =>
+          throw new AnalysisException(
+            s"Data source does not support user-supplied schema: $name")
+        case _ =>
+          throw new AnalysisException(s"Data source is not readable: $name")
+      }
+    }
+
+    def name: String = {
+      source match {
+        case registered: DataSourceRegister =>
+          registered.shortName()
+        case _ =>
+          source.getClass.getSimpleName
+      }
+    }
+  }
+
+  private def makeV2Options(options: Map[String, String]): DataSourceOptions = {
+    new DataSourceOptions(options.asJava)
+  }
+
+  private def schema(
+      source: DataSourceV2,
+      v2Options: DataSourceOptions,
+      userSchema: Option[StructType]): StructType = {
+    val reader = userSchema match {
+      // TODO: remove this case because it is confusing for users
+      case Some(s) if !source.isInstanceOf[ReadSupportWithSchema] =>
+        val reader = source.asReadSupport.createReader(v2Options)
+        if (reader.readSchema() != s) {
+          throw new AnalysisException(s"${source.name} does not allow user-specified schemas.")
+        }
+        reader
+      case Some(s) =>
+        source.asReadSupportWithSchema.createReader(s, v2Options)
+      case _ =>
+        source.asReadSupport.createReader(v2Options)
+    }
+    reader.readSchema()
+  }
+
+  def create(
+      source: DataSourceV2,
+      options: Map[String, String],
+      filters: Option[Seq[Expression]] = None,
+      userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {
+    val projection = schema(source, makeV2Options(options), userSpecifiedSchema).toAttributes
+    DataSourceV2Relation(source, options, projection, filters,
+      // if the source does not implement ReadSupportWithSchema, then the userSpecifiedSchema must
+      // be equal to the reader's schema. the schema method enforces this. because the user schema
+      // and the reader's schema are identical, drop the user schema.
+      if (source.isInstanceOf[ReadSupportWithSchema]) userSpecifiedSchema else None)
+  }
+
+  private def pushRequiredColumns(reader: DataSourceReader, struct: StructType): Unit = {
+    reader match {
+      case projectionSupport: SupportsPushDownRequiredColumns =>
+        projectionSupport.pruneColumns(struct)
+      case _ =>
+    }
+  }
+
+  private def pushFilters(
+      reader: DataSourceReader,
+      filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
+    reader match {
+      case catalystFilterSupport: SupportsPushDownCatalystFilters =>
+        (
+            catalystFilterSupport.pushCatalystFilters(filters.toArray),
+            catalystFilterSupport.pushedCatalystFilters()
+        )
+
+      case filterSupport: SupportsPushDownFilters =>
+        // A map from original Catalyst expressions to corresponding translated data source
+        // filters. If a predicate is not in this map, it means it cannot be pushed down.
+        val translatedMap: Map[Expression, Filter] = filters.flatMap { p =>
+          DataSourceStrategy.translateFilter(p).map(f => p -> f)
+        }.toMap
+
+        // Catalyst predicate expressions that cannot be converted to data source filters.
+        val nonConvertiblePredicates = filters.filterNot(translatedMap.contains)
+
+        // Data source filters that cannot be pushed down. An unhandled filter means
+        // the data source cannot guarantee the rows returned can pass the filter.
+        // As a result we must return it so Spark can plan an extra filter operator.
+        val unhandledFilters = filterSupport.pushFilters(translatedMap.values.toArray).toSet
+        val (unhandledPredicates, pushedPredicates) = translatedMap.partition { case (_, f) =>
+          unhandledFilters.contains(f)
+        }
+
+        (nonConvertiblePredicates ++ unhandledPredicates.keys, pushedPredicates.keys.toSeq)
+
+      case _ => (filters, Nil)
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/aadf9535/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index df5b524..c4e7644 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -23,8 +23,11 @@ import org.apache.spark.sql.execution.SparkPlan
 
 object DataSourceV2Strategy extends Strategy {
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-    case DataSourceV2Relation(output, reader) =>
-      DataSourceV2ScanExec(output, reader) :: Nil
+    case relation: DataSourceV2Relation =>
+      DataSourceV2ScanExec(relation.output, relation.reader) :: Nil
+
+    case relation: StreamingDataSourceV2Relation =>
+      DataSourceV2ScanExec(relation.output, relation.reader) :: Nil
 
     case WriteToDataSourceV2(writer, query) =>
       WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/aadf9535/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
index 1ca6cbf..f23d228 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
@@ -17,130 +17,55 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeMap, AttributeSet, Expression, NamedExpression, PredicateHelper}
-import org.apache.spark.sql.catalyst.optimizer.RemoveRedundantProject
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
-import org.apache.spark.sql.sources
-import org.apache.spark.sql.sources.v2.reader._
 
-/**
- * Pushes down various operators to the underlying data source for better performance. Operators are
- * being pushed down with a specific order. As an example, given a LIMIT has a FILTER child, you
- * can't push down LIMIT if FILTER is not completely pushed down. When both are pushed down, the
- * data source should execute FILTER before LIMIT. And required columns are calculated at the end,
- * because when more operators are pushed down, we may need less columns at Spark side.
- */
-object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with PredicateHelper {
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-    // Note that, we need to collect the target operator along with PROJECT node, as PROJECT may
-    // appear in many places for column pruning.
-    // TODO: Ideally column pruning should be implemented via a plan property that is propagated
-    // top-down, then we can simplify the logic here and only collect target operators.
-    val filterPushed = plan transformUp {
-      case FilterAndProject(fields, condition, r @ DataSourceV2Relation(_, reader)) =>
-        val (candidates, nonDeterministic) =
-          splitConjunctivePredicates(condition).partition(_.deterministic)
-
-        val stayUpFilters: Seq[Expression] = reader match {
-          case r: SupportsPushDownCatalystFilters =>
-            r.pushCatalystFilters(candidates.toArray)
-
-          case r: SupportsPushDownFilters =>
-            // A map from original Catalyst expressions to corresponding translated data source
-            // filters. If a predicate is not in this map, it means it cannot be pushed down.
-            val translatedMap: Map[Expression, sources.Filter] = candidates.flatMap { p =>
-              DataSourceStrategy.translateFilter(p).map(f => p -> f)
-            }.toMap
-
-            // Catalyst predicate expressions that cannot be converted to data source filters.
-            val nonConvertiblePredicates = candidates.filterNot(translatedMap.contains)
-
-            // Data source filters that cannot be pushed down. An unhandled filter means
-            // the data source cannot guarantee the rows returned can pass the filter.
-            // As a result we must return it so Spark can plan an extra filter operator.
-            val unhandledFilters = r.pushFilters(translatedMap.values.toArray).toSet
-            val unhandledPredicates = translatedMap.filter { case (_, f) =>
-              unhandledFilters.contains(f)
-            }.keys
-
-            nonConvertiblePredicates ++ unhandledPredicates
-
-          case _ => candidates
-        }
-
-        val filterCondition = (stayUpFilters ++ nonDeterministic).reduceLeftOption(And)
-        val withFilter = filterCondition.map(Filter(_, r)).getOrElse(r)
-        if (withFilter.output == fields) {
-          withFilter
-        } else {
-          Project(fields, withFilter)
-        }
-    }
-
-    // TODO: add more push down rules.
-
-    val columnPruned = pushDownRequiredColumns(filterPushed, filterPushed.outputSet)
-    // After column pruning, we may have redundant PROJECT nodes in the query plan, remove them.
-    RemoveRedundantProject(columnPruned)
-  }
-
-  // TODO: nested fields pruning
-  private def pushDownRequiredColumns(
-      plan: LogicalPlan, requiredByParent: AttributeSet): LogicalPlan = {
-    plan match {
-      case p @ Project(projectList, child) =>
-        val required = projectList.flatMap(_.references)
-        p.copy(child = pushDownRequiredColumns(child, AttributeSet(required)))
-
-      case f @ Filter(condition, child) =>
-        val required = requiredByParent ++ condition.references
-        f.copy(child = pushDownRequiredColumns(child, required))
-
-      case relation: DataSourceV2Relation => relation.reader match {
-        case reader: SupportsPushDownRequiredColumns =>
-          // TODO: Enable the below assert after we make `DataSourceV2Relation` immutable. Fow now
-          // it's possible that the mutable reader being updated by someone else, and we need to
-          // always call `reader.pruneColumns` here to correct it.
-          // assert(relation.output.toStructType == reader.readSchema(),
-          //  "Schema of data source reader does not match the relation plan.")
-
-          val requiredColumns = relation.output.filter(requiredByParent.contains)
-          reader.pruneColumns(requiredColumns.toStructType)
-
-          val nameToAttr = relation.output.map(_.name).zip(relation.output).toMap
-          val newOutput = reader.readSchema().map(_.name).map(nameToAttr)
-          relation.copy(output = newOutput)
-
-        case _ => relation
+object PushDownOperatorsToDataSource extends Rule[LogicalPlan] {
+  override def apply(
+      plan: LogicalPlan): LogicalPlan = plan transformUp {
+    // PhysicalOperation guarantees that filters are deterministic; no need to check
+    case PhysicalOperation(project, newFilters, relation : DataSourceV2Relation) =>
+      // merge the filters
+      val filters = relation.filters match {
+        case Some(existing) =>
+          existing ++ newFilters
+        case _ =>
+          newFilters
       }
 
-      // TODO: there may be more operators that can be used to calculate the required columns. We
-      // can add more and more in the future.
-      case _ => plan.mapChildren(c => pushDownRequiredColumns(c, c.outputSet))
-    }
-  }
-
-  /**
-   * Finds a Filter node(with an optional Project child) above data source relation.
-   */
-  object FilterAndProject {
-    // returns the project list, the filter condition and the data source relation.
-    def unapply(plan: LogicalPlan)
-        : Option[(Seq[NamedExpression], Expression, DataSourceV2Relation)] = plan match {
+      val projectAttrs = project.map(_.toAttribute)
+      val projectSet = AttributeSet(project.flatMap(_.references))
+      val filterSet = AttributeSet(filters.flatMap(_.references))
+
+      val projection = if (filterSet.subsetOf(projectSet) &&
+          AttributeSet(projectAttrs) == projectSet) {
+        // When the required projection contains all of the filter columns and column pruning alone
+        // can produce the required projection, push the required projection.
+        // A final projection may still be needed if the data source produces a different column
+        // order or if it cannot prune all of the nested columns.
+        projectAttrs
+      } else {
+        // When there are filter columns not already in the required projection or when the required
+        // projection is more complicated than column pruning, base column pruning on the set of
+        // all columns needed by both.
+        (projectSet ++ filterSet).toSeq
+      }
 
-      case Filter(condition, r: DataSourceV2Relation) => Some((r.output, condition, r))
+      val newRelation = relation.copy(
+        projection = projection.asInstanceOf[Seq[AttributeReference]],
+        filters = Some(filters))
 
-      case Filter(condition, Project(fields, r: DataSourceV2Relation))
-          if fields.forall(_.deterministic) =>
-        val attributeMap = AttributeMap(fields.map(e => e.toAttribute -> e))
-        val substituted = condition.transform {
-          case a: Attribute => attributeMap.getOrElse(a, a)
-        }
-        Some((fields, substituted, r))
+      // Add a Filter for any filters that could not be pushed
+      val unpushedFilter = newRelation.unsupportedFilters.reduceLeftOption(And)
+      val filtered = unpushedFilter.map(Filter(_, newRelation)).getOrElse(newRelation)
 
-      case _ => None
-    }
+      // Add a Project to ensure the output matches the required projection
+      if (newRelation.output != projectAttrs) {
+        Project(project, filtered)
+      } else {
+        filtered
+      }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/aadf9535/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index c3294d6..2c1d6c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -201,7 +201,7 @@ class ContinuousExecution(
     val withSink = WriteToDataSourceV2(writer, triggerLogicalPlan)
 
     val reader = withSink.collect {
-      case DataSourceV2Relation(_, r: ContinuousReader) => r
+      case StreamingDataSourceV2Relation(_, r: ContinuousReader) => r
     }.head
 
     reportTimeTaken("queryPlanning") {

http://git-wip-us.apache.org/repos/asf/spark/blob/aadf9535/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index a1c87fb..1157a35 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -146,7 +146,7 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
     Seq(classOf[SchemaRequiredDataSource], classOf[JavaSchemaRequiredDataSource]).foreach { cls =>
       withClue(cls.getName) {
         val e = intercept[AnalysisException](spark.read.format(cls.getName).load())
-        assert(e.message.contains("A schema needs to be specified"))
+        assert(e.message.contains("requires a user-supplied schema"))
 
         val schema = new StructType().add("i", "int").add("s", "string")
         val df = spark.read.format(cls.getName).schema(schema).load()

http://git-wip-us.apache.org/repos/asf/spark/blob/aadf9535/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 37fe595..159dd0e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -38,9 +38,9 @@ import org.apache.spark.sql.{Dataset, Encoder, QueryTest, Row}
 import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, RowEncoder}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
 import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, ContinuousTrigger, EpochCoordinatorRef, IncrementAndGetEpoch}
+import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, EpochCoordinatorRef, IncrementAndGetEpoch}
 import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2
 import org.apache.spark.sql.execution.streaming.state.StateStore
 import org.apache.spark.sql.streaming.StreamingQueryListener._
@@ -605,7 +605,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
                 plan
                   .collect {
                     case StreamingExecutionRelation(s, _) => s
-                    case DataSourceV2Relation(_, r) => r
+                    case StreamingDataSourceV2Relation(_, r) => r
                   }
                   .zipWithIndex
                   .find(_._1 == source)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org