You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/02/13 05:12:27 UTC
spark git commit: [SPARK-23303][SQL] improve the explain result for
data source v2 relations
Repository: spark
Updated Branches:
refs/heads/master ed4e78bd6 -> f17b936f0
[SPARK-23303][SQL] improve the explain result for data source v2 relations
## What changes were proposed in this pull request?
The current explain result for data source v2 relation is unreadable:
```
== Parsed Logical Plan ==
'Filter ('i > 6)
+- AnalysisBarrier
+- Project [j#1]
+- DataSourceV2Relation [i#0, j#1], org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940
== Analyzed Logical Plan ==
j: int
Project [j#1]
+- Filter (i#0 > 6)
+- Project [j#1, i#0]
+- DataSourceV2Relation [i#0, j#1], org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940
== Optimized Logical Plan ==
Project [j#1]
+- Filter isnotnull(i#0)
+- DataSourceV2Relation [i#0, j#1], org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940
== Physical Plan ==
*(1) Project [j#1]
+- *(1) Filter isnotnull(i#0)
+- *(1) DataSourceV2Scan [i#0, j#1], org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940
```
after this PR
```
== Parsed Logical Plan ==
'Project [unresolvedalias('j, None)]
+- AnalysisBarrier
+- Relation AdvancedDataSourceV2[i#0, j#1]
== Analyzed Logical Plan ==
j: int
Project [j#1]
+- Relation AdvancedDataSourceV2[i#0, j#1]
== Optimized Logical Plan ==
Relation AdvancedDataSourceV2[j#1]
== Physical Plan ==
*(1) Scan AdvancedDataSourceV2[j#1]
```
-------
```
== Analyzed Logical Plan ==
i: int, j: int
Filter (i#88 > 3)
+- Relation JavaAdvancedDataSourceV2[i#88, j#89]
== Optimized Logical Plan ==
Filter isnotnull(i#88)
+- Relation JavaAdvancedDataSourceV2[i#88, j#89] (PushedFilter: [GreaterThan(i,3)])
== Physical Plan ==
*(1) Filter isnotnull(i#88)
+- *(1) Scan JavaAdvancedDataSourceV2[i#88, j#89] (PushedFilter: [GreaterThan(i,3)])
```
an example for streaming query
```
== Parsed Logical Plan ==
Aggregate [value#6], [value#6, count(1) AS count(1)#11L]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#6]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#5: java.lang.String
+- DeserializeToObject cast(value#25 as string).toString, obj#4: java.lang.String
+- Streaming Relation FakeDataSourceV2$[value#25]
== Analyzed Logical Plan ==
value: string, count(1): bigint
Aggregate [value#6], [value#6, count(1) AS count(1)#11L]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#6]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#5: java.lang.String
+- DeserializeToObject cast(value#25 as string).toString, obj#4: java.lang.String
+- Streaming Relation FakeDataSourceV2$[value#25]
== Optimized Logical Plan ==
Aggregate [value#6], [value#6, count(1) AS count(1)#11L]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#6]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#5: java.lang.String
+- DeserializeToObject value#25.toString, obj#4: java.lang.String
+- Streaming Relation FakeDataSourceV2$[value#25]
== Physical Plan ==
*(4) HashAggregate(keys=[value#6], functions=[count(1)], output=[value#6, count(1)#11L])
+- StateStoreSave [value#6], state info [ checkpoint = *********(redacted)/cloud/dev/spark/target/tmp/temporary-549f264b-2531-4fcb-a52f-433c77347c12/state, runId = f84d9da9-2f8c-45c1-9ea1-70791be684de, opId = 0, ver = 0, numPartitions = 5], Complete, 0
+- *(3) HashAggregate(keys=[value#6], functions=[merge_count(1)], output=[value#6, count#16L])
+- StateStoreRestore [value#6], state info [ checkpoint = *********(redacted)/cloud/dev/spark/target/tmp/temporary-549f264b-2531-4fcb-a52f-433c77347c12/state, runId = f84d9da9-2f8c-45c1-9ea1-70791be684de, opId = 0, ver = 0, numPartitions = 5]
+- *(2) HashAggregate(keys=[value#6], functions=[merge_count(1)], output=[value#6, count#16L])
+- Exchange hashpartitioning(value#6, 5)
+- *(1) HashAggregate(keys=[value#6], functions=[partial_count(1)], output=[value#6, count#16L])
+- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#6]
+- *(1) MapElements <function1>, obj#5: java.lang.String
+- *(1) DeserializeToObject value#25.toString, obj#4: java.lang.String
+- *(1) Scan FakeDataSourceV2$[value#25]
```
## How was this patch tested?
N/A
Author: Wenchen Fan <we...@databricks.com>
Closes #20477 from cloud-fan/explain.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f17b936f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f17b936f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f17b936f
Branch: refs/heads/master
Commit: f17b936f0ddb7d46d1349bd42f9a64c84c06e48d
Parents: ed4e78b
Author: Wenchen Fan <we...@databricks.com>
Authored: Mon Feb 12 21:12:22 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Mon Feb 12 21:12:22 2018 -0800
----------------------------------------------------------------------
.../kafka010/KafkaContinuousSourceSuite.scala | 18 +---
.../sql/kafka010/KafkaContinuousTest.scala | 3 +-
.../spark/sql/kafka010/KafkaSourceSuite.scala | 3 +-
.../org/apache/spark/sql/DataFrameReader.scala | 8 +-
.../datasources/v2/DataSourceReaderHolder.scala | 64 -------------
.../datasources/v2/DataSourceV2QueryPlan.scala | 96 ++++++++++++++++++++
.../datasources/v2/DataSourceV2Relation.scala | 26 +++---
.../datasources/v2/DataSourceV2ScanExec.scala | 6 +-
.../datasources/v2/DataSourceV2Strategy.scala | 4 +-
.../v2/PushDownOperatorsToDataSource.scala | 4 +-
.../streaming/MicroBatchExecution.scala | 22 +++--
.../continuous/ContinuousExecution.scala | 9 +-
.../spark/sql/streaming/StreamSuite.scala | 8 +-
.../apache/spark/sql/streaming/StreamTest.scala | 2 +-
.../streaming/continuous/ContinuousSuite.scala | 11 +--
15 files changed, 157 insertions(+), 127 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/f17b936f/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
index a7083fa..72ee0c5 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
@@ -17,20 +17,9 @@
package org.apache.spark.sql.kafka010
-import java.util.Properties
-import java.util.concurrent.atomic.AtomicInteger
-
-import org.scalatest.time.SpanSugar._
-import scala.collection.mutable
-import scala.util.Random
-
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row}
+import org.apache.spark.sql.Dataset
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.execution.streaming.StreamExecution
-import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
-import org.apache.spark.sql.streaming.{StreamTest, Trigger}
-import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+import org.apache.spark.sql.streaming.Trigger
// Run tests in KafkaSourceSuiteBase in continuous execution mode.
class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuousTest
@@ -71,7 +60,8 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
eventually(timeout(streamingTimeout)) {
assert(
query.lastExecution.logical.collectFirst {
- case DataSourceV2Relation(_, r: KafkaContinuousReader) => r
+ case r: DataSourceV2Relation if r.reader.isInstanceOf[KafkaContinuousReader] =>
+ r.reader.asInstanceOf[KafkaContinuousReader]
}.exists { r =>
// Ensure the new topic is present and the old topic is gone.
r.knownPartitions.exists(_.topic == topic2)
http://git-wip-us.apache.org/repos/asf/spark/blob/f17b936f/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
index 5a1a14f..d34458a 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
@@ -47,7 +47,8 @@ trait KafkaContinuousTest extends KafkaSourceTest {
eventually(timeout(streamingTimeout)) {
assert(
query.lastExecution.logical.collectFirst {
- case DataSourceV2Relation(_, r: KafkaContinuousReader) => r
+ case r: DataSourceV2Relation if r.reader.isInstanceOf[KafkaContinuousReader] =>
+ r.reader.asInstanceOf[KafkaContinuousReader]
}.exists(_.knownPartitions.size == newCount),
s"query never reconfigured to $newCount partitions")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/f17b936f/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index 02c8764..cb09cce 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -117,7 +117,8 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
} ++ (query.get.lastExecution match {
case null => Seq()
case e => e.logical.collect {
- case DataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
+ case r: DataSourceV2Relation if r.reader.isInstanceOf[KafkaContinuousReader] =>
+ r.reader.asInstanceOf[KafkaContinuousReader]
}
})
if (sources.isEmpty) {
http://git-wip-us.apache.org/repos/asf/spark/blob/f17b936f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index fcaf8d6..984b651 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -189,11 +189,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
- val ds = cls.newInstance()
+ val ds = cls.newInstance().asInstanceOf[DataSourceV2]
val options = new DataSourceOptions((extraOptions ++
- DataSourceV2Utils.extractSessionConfigs(
- ds = ds.asInstanceOf[DataSourceV2],
- conf = sparkSession.sessionState.conf)).asJava)
+ DataSourceV2Utils.extractSessionConfigs(ds, sparkSession.sessionState.conf)).asJava)
// Streaming also uses the data source V2 API. So it may be that the data source implements
// v2, but has no v2 implementation for batch reads. In that case, we fall back to loading
@@ -221,7 +219,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
if (reader == null) {
loadV1Source(paths: _*)
} else {
- Dataset.ofRows(sparkSession, DataSourceV2Relation(reader))
+ Dataset.ofRows(sparkSession, DataSourceV2Relation(ds, reader))
}
} else {
loadV1Source(paths: _*)
http://git-wip-us.apache.org/repos/asf/spark/blob/f17b936f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala
deleted file mode 100644
index 81219e9..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.v2
-
-import java.util.Objects
-
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.sources.v2.reader._
-
-/**
- * A base class for data source reader holder with customized equals/hashCode methods.
- */
-trait DataSourceReaderHolder {
-
- /**
- * The output of the data source reader, w.r.t. column pruning.
- */
- def output: Seq[Attribute]
-
- /**
- * The held data source reader.
- */
- def reader: DataSourceReader
-
- /**
- * The metadata of this data source reader that can be used for equality test.
- */
- private def metadata: Seq[Any] = {
- val filters: Any = reader match {
- case s: SupportsPushDownCatalystFilters => s.pushedCatalystFilters().toSet
- case s: SupportsPushDownFilters => s.pushedFilters().toSet
- case _ => Nil
- }
- Seq(output, reader.getClass, filters)
- }
-
- def canEqual(other: Any): Boolean
-
- override def equals(other: Any): Boolean = other match {
- case other: DataSourceReaderHolder =>
- canEqual(other) && metadata.length == other.metadata.length &&
- metadata.zip(other.metadata).forall { case (l, r) => l == r }
- case _ => false
- }
-
- override def hashCode(): Int = {
- metadata.map(Objects.hashCode).foldLeft(0)((a, b) => 31 * a + b)
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/f17b936f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2QueryPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2QueryPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2QueryPlan.scala
new file mode 100644
index 0000000..1e0d088
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2QueryPlan.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import java.util.Objects
+
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.v2.DataSourceV2
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.util.Utils
+
+/**
+ * A base class for data source v2 related query plan(both logical and physical). It defines the
+ * equals/hashCode methods, and provides a string representation of the query plan, according to
+ * some common information.
+ */
+trait DataSourceV2QueryPlan {
+
+ /**
+ * The output of the data source reader, w.r.t. column pruning.
+ */
+ def output: Seq[Attribute]
+
+ /**
+ * The instance of this data source implementation. Note that we only consider its class in
+ * equals/hashCode, not the instance itself.
+ */
+ def source: DataSourceV2
+
+ /**
+ * The created data source reader. Here we use it to get the filters that has been pushed down
+ * so far, itself doesn't take part in the equals/hashCode.
+ */
+ def reader: DataSourceReader
+
+ private lazy val filters = reader match {
+ case s: SupportsPushDownCatalystFilters => s.pushedCatalystFilters().toSet
+ case s: SupportsPushDownFilters => s.pushedFilters().toSet
+ case _ => Set.empty
+ }
+
+ /**
+ * The metadata of this data source query plan that can be used for equality check.
+ */
+ private def metadata: Seq[Any] = Seq(output, source.getClass, filters)
+
+ def canEqual(other: Any): Boolean
+
+ override def equals(other: Any): Boolean = other match {
+ case other: DataSourceV2QueryPlan => canEqual(other) && metadata == other.metadata
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ metadata.map(Objects.hashCode).foldLeft(0)((a, b) => 31 * a + b)
+ }
+
+ def metadataString: String = {
+ val entries = scala.collection.mutable.ArrayBuffer.empty[(String, String)]
+ if (filters.nonEmpty) entries += "PushedFilter" -> filters.mkString("[", ", ", "]")
+
+ val outputStr = Utils.truncatedString(output, "[", ", ", "]")
+
+ val entriesStr = if (entries.nonEmpty) {
+ Utils.truncatedString(entries.map {
+ case (key, value) => key + ": " + StringUtils.abbreviate(redact(value), 100)
+ }, " (", ", ", ")")
+ } else {
+ ""
+ }
+
+ s"${source.getClass.getSimpleName}$outputStr$entriesStr"
+ }
+
+ private def redact(text: String): String = {
+ Utils.redact(SQLConf.get.stringRedationPattern, text)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/f17b936f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 38f6b15..cd97e0c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -20,15 +20,23 @@ package org.apache.spark.sql.execution.datasources.v2
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
+import org.apache.spark.sql.sources.v2.DataSourceV2
import org.apache.spark.sql.sources.v2.reader._
case class DataSourceV2Relation(
output: Seq[AttributeReference],
- reader: DataSourceReader)
- extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+ source: DataSourceV2,
+ reader: DataSourceReader,
+ override val isStreaming: Boolean)
+ extends LeafNode with MultiInstanceRelation with DataSourceV2QueryPlan {
override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation]
+ override def simpleString: String = {
+ val streamingHeader = if (isStreaming) "Streaming " else ""
+ s"${streamingHeader}Relation $metadataString"
+ }
+
override def computeStats(): Statistics = reader match {
case r: SupportsReportStatistics =>
Statistics(sizeInBytes = r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
@@ -41,18 +49,8 @@ case class DataSourceV2Relation(
}
}
-/**
- * A specialization of DataSourceV2Relation with the streaming bit set to true. Otherwise identical
- * to the non-streaming relation.
- */
-class StreamingDataSourceV2Relation(
- output: Seq[AttributeReference],
- reader: DataSourceReader) extends DataSourceV2Relation(output, reader) {
- override def isStreaming: Boolean = true
-}
-
object DataSourceV2Relation {
- def apply(reader: DataSourceReader): DataSourceV2Relation = {
- new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
+ def apply(source: DataSourceV2, reader: DataSourceReader): DataSourceV2Relation = {
+ new DataSourceV2Relation(reader.readSchema().toAttributes, source, reader, isStreaming = false)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/f17b936f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
index 7d9581b..c99d535 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec}
import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.sources.v2.DataSourceV2
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader
import org.apache.spark.sql.types.StructType
@@ -36,11 +37,14 @@ import org.apache.spark.sql.types.StructType
*/
case class DataSourceV2ScanExec(
output: Seq[AttributeReference],
+ @transient source: DataSourceV2,
@transient reader: DataSourceReader)
- extends LeafExecNode with DataSourceReaderHolder with ColumnarBatchScan {
+ extends LeafExecNode with DataSourceV2QueryPlan with ColumnarBatchScan {
override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2ScanExec]
+ override def simpleString: String = s"Scan $metadataString"
+
override def outputPartitioning: physical.Partitioning = reader match {
case s: SupportsReportPartitioning =>
new DataSourcePartitioning(
http://git-wip-us.apache.org/repos/asf/spark/blob/f17b936f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index df5b524..fb61e6f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -23,8 +23,8 @@ import org.apache.spark.sql.execution.SparkPlan
object DataSourceV2Strategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case DataSourceV2Relation(output, reader) =>
- DataSourceV2ScanExec(output, reader) :: Nil
+ case r: DataSourceV2Relation =>
+ DataSourceV2ScanExec(r.output, r.source, r.reader) :: Nil
case WriteToDataSourceV2(writer, query) =>
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil
http://git-wip-us.apache.org/repos/asf/spark/blob/f17b936f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
index 1ca6cbf..4cfdd50 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
@@ -39,11 +39,11 @@ object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with PredicateHel
// TODO: Ideally column pruning should be implemented via a plan property that is propagated
// top-down, then we can simplify the logic here and only collect target operators.
val filterPushed = plan transformUp {
- case FilterAndProject(fields, condition, r @ DataSourceV2Relation(_, reader)) =>
+ case FilterAndProject(fields, condition, r: DataSourceV2Relation) =>
val (candidates, nonDeterministic) =
splitConjunctivePredicates(condition).partition(_.deterministic)
- val stayUpFilters: Seq[Expression] = reader match {
+ val stayUpFilters: Seq[Expression] = r.reader match {
case r: SupportsPushDownCatalystFilters =>
r.pushCatalystFilters(candidates.toArray)
http://git-wip-us.apache.org/repos/asf/spark/blob/f17b936f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 8125333..84564b6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -27,9 +27,9 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
import org.apache.spark.sql.execution.streaming.sources.{InternalRowMicroBatchWriter, MicroBatchWriter}
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, MicroBatchReadSupport, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset => OffsetV2}
import org.apache.spark.sql.sources.v2.writer.SupportsWriteInternalRow
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
@@ -52,6 +52,8 @@ class MicroBatchExecution(
@volatile protected var sources: Seq[BaseStreamingSource] = Seq.empty
+ private val readerToDataSourceMap = MutableMap.empty[MicroBatchReader, DataSourceV2]
+
private val triggerExecutor = trigger match {
case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
case OneTimeTrigger => OneTimeExecutor()
@@ -90,6 +92,7 @@ class MicroBatchExecution(
metadataPath,
new DataSourceOptions(options.asJava))
nextSourceId += 1
+ readerToDataSourceMap(reader) = source
StreamingExecutionRelation(reader, output)(sparkSession)
})
case s @ StreamingRelationV2(_, sourceName, _, output, v1Relation) =>
@@ -405,12 +408,15 @@ class MicroBatchExecution(
case v1: SerializedOffset => reader.deserializeOffset(v1.json)
case v2: OffsetV2 => v2
}
- reader.setOffsetRange(
- toJava(current),
- Optional.of(availableV2))
+ reader.setOffsetRange(toJava(current), Optional.of(availableV2))
logDebug(s"Retrieving data from $reader: $current -> $availableV2")
- Some(reader ->
- new StreamingDataSourceV2Relation(reader.readSchema().toAttributes, reader))
+ Some(reader -> new DataSourceV2Relation(
+ reader.readSchema().toAttributes,
+ // Provide a fake value here just in case something went wrong, e.g. the reader gives
+ // a wrong `equals` implementation.
+ readerToDataSourceMap.getOrElse(reader, FakeDataSourceV2),
+ reader,
+ isStreaming = true))
case _ => None
}
}
@@ -500,3 +506,5 @@ class MicroBatchExecution(
Optional.ofNullable(scalaOption.orNull)
}
}
+
+object FakeDataSourceV2 extends DataSourceV2
http://git-wip-us.apache.org/repos/asf/spark/blob/f17b936f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index c3294d6..f87d57d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentDate, CurrentTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, StreamingDataSourceV2Relation, WriteToDataSourceV2}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, PartitionOffset}
@@ -167,7 +167,7 @@ class ContinuousExecution(
var insertedSourceId = 0
val withNewSources = logicalPlan transform {
- case ContinuousExecutionRelation(_, _, output) =>
+ case ContinuousExecutionRelation(ds, _, output) =>
val reader = continuousSources(insertedSourceId)
insertedSourceId += 1
val newOutput = reader.readSchema().toAttributes
@@ -180,7 +180,7 @@ class ContinuousExecution(
val loggedOffset = offsets.offsets(0)
val realOffset = loggedOffset.map(off => reader.deserializeOffset(off.json))
reader.setStartOffset(java.util.Optional.ofNullable(realOffset.orNull))
- new StreamingDataSourceV2Relation(newOutput, reader)
+ new DataSourceV2Relation(newOutput, ds, reader, isStreaming = true)
}
// Rewire the plan to use the new attributes that were returned by the source.
@@ -201,7 +201,8 @@ class ContinuousExecution(
val withSink = WriteToDataSourceV2(writer, triggerLogicalPlan)
val reader = withSink.collect {
- case DataSourceV2Relation(_, r: ContinuousReader) => r
+ case r: DataSourceV2Relation if r.reader.isInstanceOf[ContinuousReader] =>
+ r.reader.asInstanceOf[ContinuousReader]
}.head
reportTimeTaken("queryPlanning") {
http://git-wip-us.apache.org/repos/asf/spark/blob/f17b936f/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index d1a0483..70eb9f0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -492,16 +492,16 @@ class StreamSuite extends StreamTest {
val explainWithoutExtended = q.explainInternal(false)
// `extended = false` only displays the physical plan.
- assert("StreamingDataSourceV2Relation".r.findAllMatchIn(explainWithoutExtended).size === 0)
- assert("DataSourceV2Scan".r.findAllMatchIn(explainWithoutExtended).size === 1)
+ assert("Streaming Relation".r.findAllMatchIn(explainWithoutExtended).size === 0)
+ assert("Scan FakeDataSourceV2".r.findAllMatchIn(explainWithoutExtended).size === 1)
// Use "StateStoreRestore" to verify that it does output a streaming physical plan
assert(explainWithoutExtended.contains("StateStoreRestore"))
val explainWithExtended = q.explainInternal(true)
// `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical
// plan.
- assert("StreamingDataSourceV2Relation".r.findAllMatchIn(explainWithExtended).size === 3)
- assert("DataSourceV2Scan".r.findAllMatchIn(explainWithExtended).size === 1)
+ assert("Streaming Relation".r.findAllMatchIn(explainWithExtended).size === 3)
+ assert("Scan FakeDataSourceV2".r.findAllMatchIn(explainWithExtended).size === 1)
// Use "StateStoreRestore" to verify that it does output a streaming physical plan
assert(explainWithExtended.contains("StateStoreRestore"))
} finally {
http://git-wip-us.apache.org/repos/asf/spark/blob/f17b936f/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 37fe595..2543946 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -605,7 +605,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
plan
.collect {
case StreamingExecutionRelation(s, _) => s
- case DataSourceV2Relation(_, r) => r
+ case d: DataSourceV2Relation => d.reader
}
.zipWithIndex
.find(_._1 == source)
http://git-wip-us.apache.org/repos/asf/spark/blob/f17b936f/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index 4b4ed82..9ee9aaf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -17,15 +17,12 @@
package org.apache.spark.sql.streaming.continuous
-import java.util.UUID
-
-import org.apache.spark.{SparkContext, SparkEnv, SparkException}
-import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerTaskStart}
+import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart}
import org.apache.spark.sql._
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, WriteToDataSourceV2Exec}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous._
-import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamTest, Trigger}
import org.apache.spark.sql.test.TestSparkSession
@@ -43,7 +40,7 @@ class ContinuousSuiteBase extends StreamTest {
case s: ContinuousExecution =>
assert(numTriggers >= 2, "must wait for at least 2 triggers to ensure query is initialized")
val reader = s.lastExecution.executedPlan.collectFirst {
- case DataSourceV2ScanExec(_, r: RateStreamContinuousReader) => r
+ case DataSourceV2ScanExec(_, _, r: RateStreamContinuousReader) => r
}.get
val deltaMs = numTriggers * 1000 + 300
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org