You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/02/16 04:51:37 UTC

spark git commit: [SPARK-19603][SS] Fix StreamingQuery explain command

Repository: spark
Updated Branches:
  refs/heads/master 08c1972a0 -> fc02ef95c


[SPARK-19603][SS] Fix StreamingQuery explain command

## What changes were proposed in this pull request?

`StreamingQuery.explain` doesn't show the correct streaming physical plan right now because `ExplainCommand` receives a runtime batch plan and its `logicalPlan.isStreaming` is always false.

This PR adds `streaming` parameter to `ExplainCommand` to allow `StreamExecution` to specify that it's a streaming plan.

Examples of the explain outputs:

- streaming DataFrame.explain()
```
== Physical Plan ==
*HashAggregate(keys=[value#518], functions=[count(1)])
+- StateStoreSave [value#518], OperatorStateId(<unknown>,0,0), Append, 0
   +- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
      +- StateStoreRestore [value#518], OperatorStateId(<unknown>,0,0)
         +- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
            +- Exchange hashpartitioning(value#518, 5)
               +- *HashAggregate(keys=[value#518], functions=[partial_count(1)])
                  +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
                     +- *MapElements <function1>, obj#517: java.lang.String
                        +- *DeserializeToObject value#513.toString, obj#516: java.lang.String
                           +- StreamingRelation MemoryStream[value#513], [value#513]
```

- StreamingQuery.explain(extended = false)
```
== Physical Plan ==
*HashAggregate(keys=[value#518], functions=[count(1)])
+- StateStoreSave [value#518], OperatorStateId(...,0,0), Complete, 0
   +- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
      +- StateStoreRestore [value#518], OperatorStateId(...,0,0)
         +- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
            +- Exchange hashpartitioning(value#518, 5)
               +- *HashAggregate(keys=[value#518], functions=[partial_count(1)])
                  +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
                     +- *MapElements <function1>, obj#517: java.lang.String
                        +- *DeserializeToObject value#543.toString, obj#516: java.lang.String
                           +- LocalTableScan [value#543]
```

- StreamingQuery.explain(extended = true)
```
== Parsed Logical Plan ==
Aggregate [value#518], [value#518, count(1) AS count(1)#524L]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
   +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String
      +- DeserializeToObject cast(value#543 as string).toString, obj#516: java.lang.String
         +- LocalRelation [value#543]

== Analyzed Logical Plan ==
value: string, count(1): bigint
Aggregate [value#518], [value#518, count(1) AS count(1)#524L]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
   +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String
      +- DeserializeToObject cast(value#543 as string).toString, obj#516: java.lang.String
         +- LocalRelation [value#543]

== Optimized Logical Plan ==
Aggregate [value#518], [value#518, count(1) AS count(1)#524L]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
   +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String
      +- DeserializeToObject value#543.toString, obj#516: java.lang.String
         +- LocalRelation [value#543]

== Physical Plan ==
*HashAggregate(keys=[value#518], functions=[count(1)], output=[value#518, count(1)#524L])
+- StateStoreSave [value#518], OperatorStateId(...,0,0), Complete, 0
   +- *HashAggregate(keys=[value#518], functions=[merge_count(1)], output=[value#518, count#530L])
      +- StateStoreRestore [value#518], OperatorStateId(...,0,0)
         +- *HashAggregate(keys=[value#518], functions=[merge_count(1)], output=[value#518, count#530L])
            +- Exchange hashpartitioning(value#518, 5)
               +- *HashAggregate(keys=[value#518], functions=[partial_count(1)], output=[value#518, count#530L])
                  +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
                     +- *MapElements <function1>, obj#517: java.lang.String
                        +- *DeserializeToObject value#543.toString, obj#516: java.lang.String
                           +- LocalTableScan [value#543]
```

## How was this patch tested?

The updated unit test.

Author: Shixiong Zhu <sh...@databricks.com>

Closes #16934 from zsxwing/SPARK-19603.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc02ef95
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc02ef95
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc02ef95

Branch: refs/heads/master
Commit: fc02ef95cdfc226603b52dc579b7133631f7143d
Parents: 08c1972
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Wed Feb 15 20:51:33 2017 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Feb 15 20:51:33 2017 -0800

----------------------------------------------------------------------
 .../spark/sql/execution/command/commands.scala  | 28 +++++++++++++++++---
 .../execution/streaming/StreamExecution.scala   |  7 +++--
 .../spark/sql/streaming/StreamSuite.scala       | 28 +++++++++++++++++---
 3 files changed, 52 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fc02ef95/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 52d8dc2..58f5071 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -86,18 +86,18 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkPlan {
  * }}}
  *
  * @param logicalPlan plan to explain
- * @param output output schema
  * @param extended whether to do extended explain or not
  * @param codegen whether to output generated code from whole-stage codegen or not
  */
 case class ExplainCommand(
     logicalPlan: LogicalPlan,
-    override val output: Seq[Attribute] =
-      Seq(AttributeReference("plan", StringType, nullable = true)()),
     extended: Boolean = false,
     codegen: Boolean = false)
   extends RunnableCommand {
 
+  override val output: Seq[Attribute] =
+    Seq(AttributeReference("plan", StringType, nullable = true)())
+
   // Run through the optimizer to generate the physical plan.
   override def run(sparkSession: SparkSession): Seq[Row] = try {
     val queryExecution =
@@ -121,3 +121,25 @@ case class ExplainCommand(
     ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_))
   }
 }
+
+/** An explain command for users to see how a streaming batch is executed. */
+case class StreamingExplainCommand(
+    queryExecution: IncrementalExecution,
+    extended: Boolean) extends RunnableCommand {
+
+  override val output: Seq[Attribute] =
+    Seq(AttributeReference("plan", StringType, nullable = true)())
+
+  // Run through the optimizer to generate the physical plan.
+  override def run(sparkSession: SparkSession): Seq[Row] = try {
+    val outputString =
+      if (extended) {
+        queryExecution.toString
+      } else {
+        queryExecution.simpleString
+      }
+    Seq(Row(outputString))
+  } catch { case cause: TreeNodeException[_] =>
+    ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/fc02ef95/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 239d49b..e1af420 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import java.io.IOException
 import java.util.UUID
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.concurrent.locks.ReentrantLock
@@ -33,7 +32,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.execution.QueryExecution
-import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.execution.command.StreamingExplainCommand
 import org.apache.spark.sql.streaming._
 import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
 
@@ -162,7 +161,7 @@ class StreamExecution(
   private var state: State = INITIALIZING
 
   @volatile
-  var lastExecution: QueryExecution = _
+  var lastExecution: IncrementalExecution = _
 
   /** Holds the most recent input data for each source. */
   protected var newData: Map[Source, DataFrame] = _
@@ -673,7 +672,7 @@ class StreamExecution(
     if (lastExecution == null) {
       "No physical plan. Waiting for data."
     } else {
-      val explain = ExplainCommand(lastExecution.logical, extended = extended)
+      val explain = StreamingExplainCommand(lastExecution, extended = extended)
       sparkSession.sessionState.executePlan(explain).executedPlan.executeCollect()
         .map(_.getString(0)).mkString("\n")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/fc02ef95/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index f31dc8a..0296a2a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -22,7 +22,9 @@ import scala.util.control.ControlThrowable
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
+import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.sources.StreamSourceProvider
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 
@@ -277,10 +279,24 @@ class StreamSuite extends StreamTest {
 
   test("explain") {
     val inputData = MemoryStream[String]
-    val df = inputData.toDS().map(_ + "foo")
-    // Test `explain` not throwing errors
-    df.explain()
-    val q = df.writeStream.queryName("memory_explain").format("memory").start()
+    val df = inputData.toDS().map(_ + "foo").groupBy("value").agg(count("*"))
+
+    // Test `df.explain`
+    val explain = ExplainCommand(df.queryExecution.logical, extended = false)
+    val explainString =
+      spark.sessionState
+        .executePlan(explain)
+        .executedPlan
+        .executeCollect()
+        .map(_.getString(0))
+        .mkString("\n")
+    assert(explainString.contains("StateStoreRestore"))
+    assert(explainString.contains("StreamingRelation"))
+    assert(!explainString.contains("LocalTableScan"))
+
+    // Test StreamingQuery.display
+    val q = df.writeStream.queryName("memory_explain").outputMode("complete").format("memory")
+      .start()
       .asInstanceOf[StreamingQueryWrapper]
       .streamingQuery
     try {
@@ -294,12 +310,16 @@ class StreamSuite extends StreamTest {
       // `extended = false` only displays the physical plan.
       assert("LocalRelation".r.findAllMatchIn(explainWithoutExtended).size === 0)
       assert("LocalTableScan".r.findAllMatchIn(explainWithoutExtended).size === 1)
+      // Use "StateStoreRestore" to verify that it does output a streaming physical plan
+      assert(explainWithoutExtended.contains("StateStoreRestore"))
 
       val explainWithExtended = q.explainInternal(true)
       // `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical
       // plan.
       assert("LocalRelation".r.findAllMatchIn(explainWithExtended).size === 3)
       assert("LocalTableScan".r.findAllMatchIn(explainWithExtended).size === 1)
+      // Use "StateStoreRestore" to verify that it does output a streaming physical plan
+      assert(explainWithExtended.contains("StateStoreRestore"))
     } finally {
       q.stop()
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org