You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/10/11 20:51:36 UTC

spark git commit: [SPARK-21988][SS] Implement StreamingRelation.computeStats to fix explain

Repository: spark
Updated Branches:
  refs/heads/master 655f6f86f -> 645e108ee


[SPARK-21988][SS] Implement StreamingRelation.computeStats to fix explain

## What changes were proposed in this pull request?

Implement StreamingRelation.computeStats to fix explain

## How was this patch tested?

- unit tests: `StreamingRelation.computeStats` and `StreamingExecutionRelation.computeStats`.
- regression tests: `explain join with a normal source` and `explain join with MemoryStream`.

Author: Shixiong Zhu <zs...@gmail.com>

Closes #19465 from zsxwing/SPARK-21988.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/645e108e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/645e108e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/645e108e

Branch: refs/heads/master
Commit: 645e108eeb6364e57f5d7213dbbd42dbcf1124d3
Parents: 655f6f8
Author: Shixiong Zhu <zs...@gmail.com>
Authored: Wed Oct 11 13:51:33 2017 -0700
Committer: Shixiong Zhu <zs...@gmail.com>
Committed: Wed Oct 11 13:51:33 2017 -0700

----------------------------------------------------------------------
 .../execution/streaming/StreamingRelation.scala |  8 +++
 .../spark/sql/streaming/StreamSuite.scala       | 65 +++++++++++++++++---
 2 files changed, 63 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/645e108e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index ab71605..6b82c78 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -44,6 +44,14 @@ case class StreamingRelation(dataSource: DataSource, sourceName: String, output:
   extends LeafNode {
   override def isStreaming: Boolean = true
   override def toString: String = sourceName
+
+  // There's no sensible value here. On the execution path, this relation will be
+  // swapped out with microbatches. But some dataframe operations (in particular explain) do lead
+  // to this node surviving analysis. So we satisfy the LeafNode contract with the session default
+  // value.
+  override def computeStats(): Statistics = Statistics(
+    sizeInBytes = BigInt(dataSource.sparkSession.sessionState.conf.defaultSizeInBytes)
+  )
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/645e108e/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 9c90106..3d687d2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -76,20 +76,65 @@ class StreamSuite extends StreamTest {
       CheckAnswer(Row(1, 1, "one"), Row(2, 2, "two"), Row(4, 4, "four")))
   }
 
+  test("StreamingRelation.computeStats") {
+    val streamingRelation = spark.readStream.format("rate").load().logicalPlan collect {
+      case s: StreamingRelation => s
+    }
+    assert(streamingRelation.nonEmpty, "cannot find StreamingRelation")
+    assert(
+      streamingRelation.head.computeStats.sizeInBytes == spark.sessionState.conf.defaultSizeInBytes)
+  }
 
-  test("explain join") {
-    // Make a table and ensure it will be broadcast.
-    val smallTable = Seq((1, "one"), (2, "two"), (4, "four")).toDF("number", "word")
+  test("StreamingExecutionRelation.computeStats") {
+    val streamingExecutionRelation = MemoryStream[Int].toDF.logicalPlan collect {
+      case s: StreamingExecutionRelation => s
+    }
+    assert(streamingExecutionRelation.nonEmpty, "cannot find StreamingExecutionRelation")
+    assert(streamingExecutionRelation.head.computeStats.sizeInBytes
+      == spark.sessionState.conf.defaultSizeInBytes)
+  }
 
-    // Join the input stream with a table.
-    val inputData = MemoryStream[Int]
-    val joined = inputData.toDF().join(smallTable, smallTable("number") === $"value")
+  test("explain join with a normal source") {
+    // This test triggers CostBasedJoinReorder to call `computeStats`
+    withSQLConf(SQLConf.CBO_ENABLED.key -> "true", SQLConf.JOIN_REORDER_ENABLED.key -> "true") {
+      val smallTable = Seq((1, "one"), (2, "two"), (4, "four")).toDF("number", "word")
+      val smallTable2 = Seq((1, "one"), (2, "two"), (4, "four")).toDF("number", "word")
+      val smallTable3 = Seq((1, "one"), (2, "two"), (4, "four")).toDF("number", "word")
+
+      // Join the input stream with a table.
+      val df = spark.readStream.format("rate").load()
+      val joined = df.join(smallTable, smallTable("number") === $"value")
+        .join(smallTable2, smallTable2("number") === $"value")
+        .join(smallTable3, smallTable3("number") === $"value")
+
+      val outputStream = new java.io.ByteArrayOutputStream()
+      Console.withOut(outputStream) {
+        joined.explain(true)
+      }
+      assert(outputStream.toString.contains("StreamingRelation"))
+    }
+  }
 
-    val outputStream = new java.io.ByteArrayOutputStream()
-    Console.withOut(outputStream) {
-      joined.explain()
+  test("explain join with MemoryStream") {
+    // This test triggers CostBasedJoinReorder to call `computeStats`
+    // Because MemoryStream doesn't use DataSource code path, we need a separate test.
+    withSQLConf(SQLConf.CBO_ENABLED.key -> "true", SQLConf.JOIN_REORDER_ENABLED.key -> "true") {
+      val smallTable = Seq((1, "one"), (2, "two"), (4, "four")).toDF("number", "word")
+      val smallTable2 = Seq((1, "one"), (2, "two"), (4, "four")).toDF("number", "word")
+      val smallTable3 = Seq((1, "one"), (2, "two"), (4, "four")).toDF("number", "word")
+
+      // Join the input stream with a table.
+      val df = MemoryStream[Int].toDF
+      val joined = df.join(smallTable, smallTable("number") === $"value")
+        .join(smallTable2, smallTable2("number") === $"value")
+        .join(smallTable3, smallTable3("number") === $"value")
+
+      val outputStream = new java.io.ByteArrayOutputStream()
+      Console.withOut(outputStream) {
+        joined.explain(true)
+      }
+      assert(outputStream.toString.contains("StreamingRelation"))
     }
-    assert(outputStream.toString.contains("StreamingRelation"))
   }
 
   test("SPARK-20432: union one stream with itself") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org