You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/05/20 22:13:26 UTC

[spark] branch master updated: Revert "[SPARK-27439][SQL] Explainging Dataset should show correct resolved plans"

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 039db87  Revert "[SPARK-27439][SQL] Explainging Dataset should show correct resolved plans"
039db87 is described below

commit 039db879f476241efd8a6ff17c1f5998fa8187be
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Mon May 20 15:07:00 2019 -0700

    Revert "[SPARK-27439][SQL] Explainging Dataset should show correct resolved plans"
    
    This reverts commit 4b725e50a729af0708f456e27cc95a370b66c1ef.
---
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  2 +-
 .../spark/sql/execution/command/commands.scala     | 51 ++++++----------------
 .../org/apache/spark/sql/DataFrameSuite.scala      | 26 +----------
 3 files changed, 15 insertions(+), 64 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 74cb3e6..6d18d41 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -498,7 +498,7 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def explain(extended: Boolean): Unit = {
-    val explain = ExplainCommand(queryExecution, extended = extended)
+    val explain = ExplainCommand(queryExecution.logical, extended = extended)
     sparkSession.sessionState.executePlan(explain).executedPlan.executeCollect().foreach {
       // scalastyle:off println
       r => println(r.getString(0))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 8d7b3c5..a1f2785 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
-import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, UnaryExecNode}
 import org.apache.spark.sql.execution.debug._
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata}
@@ -134,13 +134,13 @@ case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan)
  *   EXPLAIN (EXTENDED | CODEGEN) SELECT * FROM ...
  * }}}
  *
- * @param queryExecution the query execution object of the plan to explain
+ * @param logicalPlan plan to explain
  * @param extended whether to do extended explain or not
  * @param codegen whether to output generated code from whole-stage codegen or not
  * @param cost whether to show cost information for operators.
  */
 case class ExplainCommand(
-    queryExecution: QueryExecution,
+    logicalPlan: LogicalPlan,
     extended: Boolean = false,
     codegen: Boolean = false,
     cost: Boolean = false)
@@ -151,6 +151,16 @@ case class ExplainCommand(
 
   // Run through the optimizer to generate the physical plan.
   override def run(sparkSession: SparkSession): Seq[Row] = try {
+    val queryExecution =
+      if (logicalPlan.isStreaming) {
+        // This is used only by explaining `Dataset/DataFrame` created by `spark.readStream`, so the
+        // output mode does not matter since there is no `Sink`.
+        new IncrementalExecution(
+          sparkSession, logicalPlan, OutputMode.Append(), "<unknown>",
+          UUID.randomUUID, UUID.randomUUID, 0, OffsetSeqMetadata(0, 0))
+      } else {
+        sparkSession.sessionState.executePlan(logicalPlan)
+      }
     val outputString =
       if (codegen) {
         codegenString(queryExecution.executedPlan)
@@ -167,41 +177,6 @@ case class ExplainCommand(
   }
 }
 
-object ExplainCommand {
-  /**
-   * Initializes an `ExplainCommand` object by passing `LogicalPlan`. This logical plan will be
-   * run through the analyzer and optimizer when this command is actually run.
-   */
-  def apply(
-      logicalPlan: LogicalPlan,
-      extended: Boolean,
-      codegen: Boolean,
-      cost: Boolean): ExplainCommand = {
-    val sparkSession = SparkSession.active
-
-    val queryExecution =
-      if (logicalPlan.isStreaming) {
-        // This is used only by explaining `Dataset/DataFrame` created by `spark.readStream`, so the
-        // output mode does not matter since there is no `Sink`.
-        new IncrementalExecution(
-          sparkSession, logicalPlan, OutputMode.Append(), "<unknown>",
-          UUID.randomUUID, UUID.randomUUID, 0, OffsetSeqMetadata(0, 0))
-      } else {
-        sparkSession.sessionState.executePlan(logicalPlan)
-      }
-    new ExplainCommand(queryExecution, extended, codegen, cost)
-  }
-
-  /**
-   * This is mainly used for tests.
-   */
-  def apply(
-      logicalPlan: LogicalPlan,
-      extended: Boolean): ExplainCommand = {
-    ExplainCommand(logicalPlan, extended, codegen = false, cost = false)
-  }
-}
-
 /** An explain command for users to see how a streaming batch is executed. */
 case class StreamingExplainCommand(
     queryExecution: IncrementalExecution,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index dd3740d..8cc84ba 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql
 
-import java.io.{ByteArrayOutputStream, File}
+import java.io.File
 import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
 import java.util.UUID
@@ -2134,30 +2134,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
     }
   }
 
-  test("SPARK-27439: Explain result should match collected result after view change") {
-    withTempView("test", "test2", "tmp") {
-      spark.range(10).createOrReplaceTempView("test")
-      spark.range(5).createOrReplaceTempView("test2")
-      spark.sql("select * from test").createOrReplaceTempView("tmp")
-      val df = spark.sql("select * from tmp")
-      spark.sql("select * from test2").createOrReplaceTempView("tmp")
-
-      val captured = new ByteArrayOutputStream()
-      Console.withOut(captured) {
-        df.explain(extended = true)
-      }
-      checkAnswer(df, spark.range(10).toDF)
-      val output = captured.toString
-      assert(output.contains(
-        """== Parsed Logical Plan ==
-          |'Project [*]
-          |+- 'UnresolvedRelation `tmp`""".stripMargin))
-      assert(output.contains(
-        """== Physical Plan ==
-          |*(1) Range (0, 10, step=1, splits=2)""".stripMargin))
-    }
-  }
-
   test("SPARK-27671: Fix analysis exception when casting null in nested field in struct") {
     val df = sql("SELECT * FROM VALUES (('a', (10, null))), (('b', (10, 50))), " +
       "(('c', null)) AS tab(x, y)")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org