You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/09/14 18:06:28 UTC

spark git commit: [SPARK-21988] Add default stats to StreamingExecutionRelation.

Repository: spark
Updated Branches:
  refs/heads/master ddd7f5e11 -> 054ddb2f5


[SPARK-21988] Add default stats to StreamingExecutionRelation.

## What changes were proposed in this pull request?

Add default stats to StreamingExecutionRelation.

## How was this patch tested?

existing unit tests and an explain() test to be sure

Author: Jose Torres <jo...@databricks.com>

Closes #19212 from joseph-torres/SPARK-21988.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/054ddb2f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/054ddb2f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/054ddb2f

Branch: refs/heads/master
Commit: 054ddb2f54ab8e6b0088fbf9d576c7770e5abcbf
Parents: ddd7f5e
Author: Jose Torres <jo...@databricks.com>
Authored: Thu Sep 14 11:06:25 2017 -0700
Committer: Shixiong Zhu <zs...@gmail.com>
Committed: Thu Sep 14 11:06:25 2017 -0700

----------------------------------------------------------------------
 .../execution/streaming/StreamExecution.scala   |  2 +-
 .../execution/streaming/StreamingRelation.scala | 20 +++++++++++++++++---
 .../spark/sql/execution/streaming/memory.scala  |  2 +-
 .../spark/sql/streaming/StreamSuite.scala       | 20 +++++++++++++++++++-
 .../sql/streaming/StreamingQuerySuite.scala     |  2 +-
 5 files changed, 39 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/054ddb2f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 71088ff..952e431 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -166,7 +166,7 @@ class StreamExecution(
           nextSourceId += 1
           // We still need to use the previous `output` instead of `source.schema` as attributes in
           // "df.logicalPlan" has already used attributes of the previous `output`.
-          StreamingExecutionRelation(source, output)
+          StreamingExecutionRelation(source, output)(sparkSession)
         })
     }
     sources = _logicalPlan.collect { case s: StreamingExecutionRelation => s.source }

http://git-wip-us.apache.org/repos/asf/spark/blob/054ddb2f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index e8b0009..ab71605 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -18,9 +18,11 @@
 package org.apache.spark.sql.execution.streaming
 
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+import org.apache.spark.sql.catalyst.plans.logical.Statistics
 import org.apache.spark.sql.execution.LeafExecNode
 import org.apache.spark.sql.execution.datasources.DataSource
 
@@ -48,9 +50,21 @@ case class StreamingRelation(dataSource: DataSource, sourceName: String, output:
  * Used to link a streaming [[Source]] of data into a
  * [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]].
  */
-case class StreamingExecutionRelation(source: Source, output: Seq[Attribute]) extends LeafNode {
+case class StreamingExecutionRelation(
+    source: Source,
+    output: Seq[Attribute])(session: SparkSession)
+  extends LeafNode {
+
   override def isStreaming: Boolean = true
   override def toString: String = source.toString
+
+  // There's no sensible value here. On the execution path, this relation will be
+  // swapped out with microbatches. But some dataframe operations (in particular explain) do lead
+  // to this node surviving analysis. So we satisfy the LeafNode contract with the session default
+  // value.
+  override def computeStats(): Statistics = Statistics(
+    sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes)
+  )
 }
 
 /**
@@ -65,7 +79,7 @@ case class StreamingRelationExec(sourceName: String, output: Seq[Attribute]) ext
 }
 
 object StreamingExecutionRelation {
-  def apply(source: Source): StreamingExecutionRelation = {
-    StreamingExecutionRelation(source, source.schema.toAttributes)
+  def apply(source: Source, session: SparkSession): StreamingExecutionRelation = {
+    StreamingExecutionRelation(source, source.schema.toAttributes)(session)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/054ddb2f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index c9784c0..3041d4d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -53,7 +53,7 @@ object MemoryStream {
 case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
     extends Source with Logging {
   protected val encoder = encoderFor[A]
-  protected val logicalPlan = StreamingExecutionRelation(this)
+  protected val logicalPlan = StreamingExecutionRelation(this, sqlContext.sparkSession)
   protected val output = logicalPlan.output
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/054ddb2f/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index d0b2041..9c90106 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -76,6 +76,22 @@ class StreamSuite extends StreamTest {
       CheckAnswer(Row(1, 1, "one"), Row(2, 2, "two"), Row(4, 4, "four")))
   }
 
+
+  test("explain join") {
+    // Make a table and ensure it will be broadcast.
+    val smallTable = Seq((1, "one"), (2, "two"), (4, "four")).toDF("number", "word")
+
+    // Join the input stream with a table.
+    val inputData = MemoryStream[Int]
+    val joined = inputData.toDF().join(smallTable, smallTable("number") === $"value")
+
+    val outputStream = new java.io.ByteArrayOutputStream()
+    Console.withOut(outputStream) {
+      joined.explain()
+    }
+    assert(outputStream.toString.contains("StreamingRelation"))
+  }
+
   test("SPARK-20432: union one stream with itself") {
     val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a")
     val unioned = df.union(df)
@@ -337,7 +353,9 @@ class StreamSuite extends StreamTest {
 
         override def stop(): Unit = {}
       }
-      val df = Dataset[Int](sqlContext.sparkSession, StreamingExecutionRelation(source))
+      val df = Dataset[Int](
+        sqlContext.sparkSession,
+        StreamingExecutionRelation(source, sqlContext.sparkSession))
       testStream(df)(
         // `ExpectFailure(isFatalError = true)` verifies two things:
         // - Fatal errors can be propagated to `StreamingQuery.exception` and

http://git-wip-us.apache.org/repos/asf/spark/blob/054ddb2f/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index bf7c448..3823e33 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -653,7 +653,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
       }
       override def stop(): Unit = {}
     }
-    StreamingExecutionRelation(source)
+    StreamingExecutionRelation(source, spark)
   }
 
   /** Returns the query progress at the end of the first trigger of streaming DF */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org