You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/10/11 20:51:36 UTC
spark git commit: [SPARK-21988][SS] Implement
StreamingRelation.computeStats to fix explain
Repository: spark
Updated Branches:
refs/heads/master 655f6f86f -> 645e108ee
[SPARK-21988][SS] Implement StreamingRelation.computeStats to fix explain
## What changes were proposed in this pull request?
Implement StreamingRelation.computeStats to fix explain
## How was this patch tested?
- unit tests: `StreamingRelation.computeStats` and `StreamingExecutionRelation.computeStats`.
- regression tests: `explain join with a normal source` and `explain join with MemoryStream`.
Author: Shixiong Zhu <zs...@gmail.com>
Closes #19465 from zsxwing/SPARK-21988.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/645e108e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/645e108e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/645e108e
Branch: refs/heads/master
Commit: 645e108eeb6364e57f5d7213dbbd42dbcf1124d3
Parents: 655f6f8
Author: Shixiong Zhu <zs...@gmail.com>
Authored: Wed Oct 11 13:51:33 2017 -0700
Committer: Shixiong Zhu <zs...@gmail.com>
Committed: Wed Oct 11 13:51:33 2017 -0700
----------------------------------------------------------------------
.../execution/streaming/StreamingRelation.scala | 8 +++
.../spark/sql/streaming/StreamSuite.scala | 65 +++++++++++++++++---
2 files changed, 63 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/645e108e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index ab71605..6b82c78 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -44,6 +44,14 @@ case class StreamingRelation(dataSource: DataSource, sourceName: String, output:
extends LeafNode {
override def isStreaming: Boolean = true
override def toString: String = sourceName
+
+ // There's no sensible value here. On the execution path, this relation will be
+ // swapped out with microbatches. But some dataframe operations (in particular explain) do lead
+ // to this node surviving analysis. So we satisfy the LeafNode contract with the session default
+ // value.
+ override def computeStats(): Statistics = Statistics(
+ sizeInBytes = BigInt(dataSource.sparkSession.sessionState.conf.defaultSizeInBytes)
+ )
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/645e108e/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 9c90106..3d687d2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -76,20 +76,65 @@ class StreamSuite extends StreamTest {
CheckAnswer(Row(1, 1, "one"), Row(2, 2, "two"), Row(4, 4, "four")))
}
+ test("StreamingRelation.computeStats") {
+ val streamingRelation = spark.readStream.format("rate").load().logicalPlan collect {
+ case s: StreamingRelation => s
+ }
+ assert(streamingRelation.nonEmpty, "cannot find StreamingRelation")
+ assert(
+ streamingRelation.head.computeStats.sizeInBytes == spark.sessionState.conf.defaultSizeInBytes)
+ }
- test("explain join") {
- // Make a table and ensure it will be broadcast.
- val smallTable = Seq((1, "one"), (2, "two"), (4, "four")).toDF("number", "word")
+ test("StreamingExecutionRelation.computeStats") {
+ val streamingExecutionRelation = MemoryStream[Int].toDF.logicalPlan collect {
+ case s: StreamingExecutionRelation => s
+ }
+ assert(streamingExecutionRelation.nonEmpty, "cannot find StreamingExecutionRelation")
+ assert(streamingExecutionRelation.head.computeStats.sizeInBytes
+ == spark.sessionState.conf.defaultSizeInBytes)
+ }
- // Join the input stream with a table.
- val inputData = MemoryStream[Int]
- val joined = inputData.toDF().join(smallTable, smallTable("number") === $"value")
+ test("explain join with a normal source") {
+ // This test triggers CostBasedJoinReorder to call `computeStats`
+ withSQLConf(SQLConf.CBO_ENABLED.key -> "true", SQLConf.JOIN_REORDER_ENABLED.key -> "true") {
+ val smallTable = Seq((1, "one"), (2, "two"), (4, "four")).toDF("number", "word")
+ val smallTable2 = Seq((1, "one"), (2, "two"), (4, "four")).toDF("number", "word")
+ val smallTable3 = Seq((1, "one"), (2, "two"), (4, "four")).toDF("number", "word")
+
+ // Join the input stream with a table.
+ val df = spark.readStream.format("rate").load()
+ val joined = df.join(smallTable, smallTable("number") === $"value")
+ .join(smallTable2, smallTable2("number") === $"value")
+ .join(smallTable3, smallTable3("number") === $"value")
+
+ val outputStream = new java.io.ByteArrayOutputStream()
+ Console.withOut(outputStream) {
+ joined.explain(true)
+ }
+ assert(outputStream.toString.contains("StreamingRelation"))
+ }
+ }
- val outputStream = new java.io.ByteArrayOutputStream()
- Console.withOut(outputStream) {
- joined.explain()
+ test("explain join with MemoryStream") {
+ // This test triggers CostBasedJoinReorder to call `computeStats`
+ // Because MemoryStream doesn't use DataSource code path, we need a separate test.
+ withSQLConf(SQLConf.CBO_ENABLED.key -> "true", SQLConf.JOIN_REORDER_ENABLED.key -> "true") {
+ val smallTable = Seq((1, "one"), (2, "two"), (4, "four")).toDF("number", "word")
+ val smallTable2 = Seq((1, "one"), (2, "two"), (4, "four")).toDF("number", "word")
+ val smallTable3 = Seq((1, "one"), (2, "two"), (4, "four")).toDF("number", "word")
+
+ // Join the input stream with a table.
+ val df = MemoryStream[Int].toDF
+ val joined = df.join(smallTable, smallTable("number") === $"value")
+ .join(smallTable2, smallTable2("number") === $"value")
+ .join(smallTable3, smallTable3("number") === $"value")
+
+ val outputStream = new java.io.ByteArrayOutputStream()
+ Console.withOut(outputStream) {
+ joined.explain(true)
+ }
+ assert(outputStream.toString.contains("StreamingRelation"))
}
- assert(outputStream.toString.contains("StreamingRelation"))
}
test("SPARK-20432: union one stream with itself") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org