You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/05/20 22:13:26 UTC
[spark] branch master updated: Revert "[SPARK-27439][SQL]
Explainging Dataset should show correct resolved plans"
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 039db87 Revert "[SPARK-27439][SQL] Explainging Dataset should show correct resolved plans"
039db87 is described below
commit 039db879f476241efd8a6ff17c1f5998fa8187be
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Mon May 20 15:07:00 2019 -0700
Revert "[SPARK-27439][SQL] Explainging Dataset should show correct resolved plans"
This reverts commit 4b725e50a729af0708f456e27cc95a370b66c1ef.
---
.../main/scala/org/apache/spark/sql/Dataset.scala | 2 +-
.../spark/sql/execution/command/commands.scala | 51 ++++++----------------
.../org/apache/spark/sql/DataFrameSuite.scala | 26 +----------
3 files changed, 15 insertions(+), 64 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 74cb3e6..6d18d41 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -498,7 +498,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def explain(extended: Boolean): Unit = {
- val explain = ExplainCommand(queryExecution, extended = extended)
+ val explain = ExplainCommand(queryExecution.logical, extended = extended)
sparkSession.sessionState.executePlan(explain).executedPlan.executeCollect().foreach {
// scalastyle:off println
r => println(r.getString(0))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 8d7b3c5..a1f2785 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
-import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.debug._
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata}
@@ -134,13 +134,13 @@ case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan)
* EXPLAIN (EXTENDED | CODEGEN) SELECT * FROM ...
* }}}
*
- * @param queryExecution the query execution object of the plan to explain
+ * @param logicalPlan plan to explain
* @param extended whether to do extended explain or not
* @param codegen whether to output generated code from whole-stage codegen or not
* @param cost whether to show cost information for operators.
*/
case class ExplainCommand(
- queryExecution: QueryExecution,
+ logicalPlan: LogicalPlan,
extended: Boolean = false,
codegen: Boolean = false,
cost: Boolean = false)
@@ -151,6 +151,16 @@ case class ExplainCommand(
// Run through the optimizer to generate the physical plan.
override def run(sparkSession: SparkSession): Seq[Row] = try {
+ val queryExecution =
+ if (logicalPlan.isStreaming) {
+ // This is used only by explaining `Dataset/DataFrame` created by `spark.readStream`, so the
+ // output mode does not matter since there is no `Sink`.
+ new IncrementalExecution(
+ sparkSession, logicalPlan, OutputMode.Append(), "<unknown>",
+ UUID.randomUUID, UUID.randomUUID, 0, OffsetSeqMetadata(0, 0))
+ } else {
+ sparkSession.sessionState.executePlan(logicalPlan)
+ }
val outputString =
if (codegen) {
codegenString(queryExecution.executedPlan)
@@ -167,41 +177,6 @@ case class ExplainCommand(
}
}
-object ExplainCommand {
- /**
- * Initializes an `ExplainCommand` object by passing `LogicalPlan`. This logical plan will be
- * run through the analyzer and optimizer when this command is actually run.
- */
- def apply(
- logicalPlan: LogicalPlan,
- extended: Boolean,
- codegen: Boolean,
- cost: Boolean): ExplainCommand = {
- val sparkSession = SparkSession.active
-
- val queryExecution =
- if (logicalPlan.isStreaming) {
- // This is used only by explaining `Dataset/DataFrame` created by `spark.readStream`, so the
- // output mode does not matter since there is no `Sink`.
- new IncrementalExecution(
- sparkSession, logicalPlan, OutputMode.Append(), "<unknown>",
- UUID.randomUUID, UUID.randomUUID, 0, OffsetSeqMetadata(0, 0))
- } else {
- sparkSession.sessionState.executePlan(logicalPlan)
- }
- new ExplainCommand(queryExecution, extended, codegen, cost)
- }
-
- /**
- * This is mainly used for tests.
- */
- def apply(
- logicalPlan: LogicalPlan,
- extended: Boolean): ExplainCommand = {
- ExplainCommand(logicalPlan, extended, codegen = false, cost = false)
- }
-}
-
/** An explain command for users to see how a streaming batch is executed. */
case class StreamingExplainCommand(
queryExecution: IncrementalExecution,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index dd3740d..8cc84ba 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql
-import java.io.{ByteArrayOutputStream, File}
+import java.io.File
import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import java.util.UUID
@@ -2134,30 +2134,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}
}
- test("SPARK-27439: Explain result should match collected result after view change") {
- withTempView("test", "test2", "tmp") {
- spark.range(10).createOrReplaceTempView("test")
- spark.range(5).createOrReplaceTempView("test2")
- spark.sql("select * from test").createOrReplaceTempView("tmp")
- val df = spark.sql("select * from tmp")
- spark.sql("select * from test2").createOrReplaceTempView("tmp")
-
- val captured = new ByteArrayOutputStream()
- Console.withOut(captured) {
- df.explain(extended = true)
- }
- checkAnswer(df, spark.range(10).toDF)
- val output = captured.toString
- assert(output.contains(
- """== Parsed Logical Plan ==
- |'Project [*]
- |+- 'UnresolvedRelation `tmp`""".stripMargin))
- assert(output.contains(
- """== Physical Plan ==
- |*(1) Range (0, 10, step=1, splits=2)""".stripMargin))
- }
- }
-
test("SPARK-27671: Fix analysis exception when casting null in nested field in struct") {
val df = sql("SELECT * FROM VALUES (('a', (10, null))), (('b', (10, 50))), " +
"(('c', null)) AS tab(x, y)")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org