You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2018/12/31 15:43:10 UTC

[GitHub] asfgit closed pull request #23406: [SPARK-26504][SQL] Rope-wise dumping of Spark plans

asfgit closed pull request #23406: [SPARK-26504][SQL] Rope-wise dumping of Spark plans
URL: https://github.com/apache/spark/pull/23406
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 125181fb213f8..8f5444ed8a5a7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.plans
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode}
 import org.apache.spark.sql.internal.SQLConf
@@ -301,4 +302,20 @@ object QueryPlan extends PredicateHelper {
       Nil
     }
   }
+
+  /**
+   * Converts the query plan to string and appends it via provided function.
+   */
+  def append[T <: QueryPlan[T]](
+      plan: => QueryPlan[T],
+      append: String => Unit,
+      verbose: Boolean,
+      addSuffix: Boolean,
+      maxFields: Int = SQLConf.get.maxToStringFields): Unit = {
+    try {
+      plan.treeString(append, verbose, addSuffix, maxFields)
+    } catch {
+      case e: AnalysisException => append(e.toString)
+    }
+  }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 21e59bbd283e4..570a019b2af77 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -17,13 +17,11 @@
 
 package org.apache.spark.sql.catalyst.trees
 
-import java.io.Writer
 import java.util.UUID
 
 import scala.collection.Map
 import scala.reflect.ClassTag
 
-import org.apache.commons.io.output.StringBuilderWriter
 import org.apache.commons.lang3.ClassUtils
 import org.json4s.JsonAST._
 import org.json4s.JsonDSL._
@@ -37,6 +35,7 @@ import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
+import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -481,21 +480,18 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
       verbose: Boolean,
       addSuffix: Boolean = false,
       maxFields: Int = SQLConf.get.maxToStringFields): String = {
-    val writer = new StringBuilderWriter()
-    try {
-      treeString(writer, verbose, addSuffix, maxFields)
-      writer.toString
-    } finally {
-      writer.close()
-    }
+    val concat = new StringConcat()
+
+    treeString(concat.append, verbose, addSuffix, maxFields)
+    concat.toString
   }
 
   def treeString(
-      writer: Writer,
+      append: String => Unit,
       verbose: Boolean,
       addSuffix: Boolean,
       maxFields: Int): Unit = {
-    generateTreeString(0, Nil, writer, verbose, "", addSuffix, maxFields)
+    generateTreeString(0, Nil, append, verbose, "", addSuffix, maxFields)
   }
 
   /**
@@ -558,7 +554,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
   def generateTreeString(
       depth: Int,
       lastChildren: Seq[Boolean],
-      writer: Writer,
+      append: String => Unit,
       verbose: Boolean,
       prefix: String = "",
       addSuffix: Boolean = false,
@@ -566,9 +562,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
 
     if (depth > 0) {
       lastChildren.init.foreach { isLast =>
-        writer.write(if (isLast) "   " else ":  ")
+        append(if (isLast) "   " else ":  ")
       }
-      writer.write(if (lastChildren.last) "+- " else ":- ")
+      append(if (lastChildren.last) "+- " else ":- ")
     }
 
     val str = if (verbose) {
@@ -576,24 +572,24 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
     } else {
       simpleString(maxFields)
     }
-    writer.write(prefix)
-    writer.write(str)
-    writer.write("\n")
+    append(prefix)
+    append(str)
+    append("\n")
 
     if (innerChildren.nonEmpty) {
       innerChildren.init.foreach(_.generateTreeString(
-        depth + 2, lastChildren :+ children.isEmpty :+ false, writer, verbose,
+        depth + 2, lastChildren :+ children.isEmpty :+ false, append, verbose,
         addSuffix = addSuffix, maxFields = maxFields))
       innerChildren.last.generateTreeString(
-        depth + 2, lastChildren :+ children.isEmpty :+ true, writer, verbose,
+        depth + 2, lastChildren :+ children.isEmpty :+ true, append, verbose,
         addSuffix = addSuffix, maxFields = maxFields)
     }
 
     if (children.nonEmpty) {
       children.init.foreach(_.generateTreeString(
-        depth + 1, lastChildren :+ false, writer, verbose, prefix, addSuffix, maxFields))
+        depth + 1, lastChildren :+ false, append, verbose, prefix, addSuffix, maxFields))
       children.last.generateTreeString(
-        depth + 1, lastChildren :+ true, writer, verbose, prefix, addSuffix, maxFields)
+        depth + 1, lastChildren :+ true, append, verbose, prefix, addSuffix, maxFields)
     }
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
index bc861a805ce61..643b83b1741ae 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.util
 
 import java.util.regex.{Pattern, PatternSyntaxException}
 
+import scala.collection.mutable.ArrayBuffer
+
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -87,4 +89,34 @@ object StringUtils {
     }
     funcNames.toSeq
   }
+
+  /**
+   * Concatenation of sequence of strings to final string with cheap append method
+   * and one memory allocation for the final string.
+   */
+  class StringConcat {
+    private val strings = new ArrayBuffer[String]
+    private var length: Int = 0
+
+    /**
+     * Appends a string and accumulates its length to allocate a string buffer for all
+     * appended strings once in the toString method.
+     */
+    def append(s: String): Unit = {
+      if (s != null) {
+        strings.append(s)
+        length += s.length
+      }
+    }
+
+    /**
+     * The method allocates memory for all appended strings, writes them to the memory and
+     * returns concatenated string.
+     */
+    override def toString: String = {
+      val result = new java.lang.StringBuilder(length)
+      strings.foreach(result.append)
+      result.toString
+    }
+  }
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala
index 78fee5135c3ae..616ec12032dbd 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala
@@ -43,4 +43,17 @@ class StringUtilsSuite extends SparkFunSuite {
     assert(filterPattern(names, " a. ") === Seq("a1", "a2"))
     assert(filterPattern(names, " d* ") === Nil)
   }
+
+  test("string concatenation") {
+    def concat(seq: String*): String = {
+      seq.foldLeft(new StringConcat())((acc, s) => {acc.append(s); acc}).toString
+    }
+
+    assert(new StringConcat().toString == "")
+    assert(concat("") == "")
+    assert(concat(null) == "")
+    assert(concat("a") == "a")
+    assert(concat("1", "2") == "12")
+    assert(concat("abc", "\n", "123") == "abc\n123")
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 9b8d2e830867d..7fccbf65d8525 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -17,20 +17,21 @@
 
 package org.apache.spark.sql.execution
 
-import java.io.{BufferedWriter, OutputStreamWriter, Writer}
+import java.io.{BufferedWriter, OutputStreamWriter}
 import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
 
-import org.apache.commons.io.output.StringBuilderWriter
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker}
 import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
+import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand}
 import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
@@ -108,10 +109,6 @@ class QueryExecution(
     ReuseExchange(sparkSession.sessionState.conf),
     ReuseSubquery(sparkSession.sessionState.conf))
 
-  protected def stringOrError[A](f: => A): String =
-    try f.toString catch { case e: AnalysisException => e.toString }
-
-
   /**
    * Returns the result as a hive compatible sequence of strings. This is used in tests and
    * `SparkSQLDriver` for CLI applications.
@@ -197,55 +194,53 @@ class QueryExecution(
   }
 
   def simpleString: String = withRedaction {
-    s"""== Physical Plan ==
-       |${stringOrError(executedPlan.treeString(verbose = false))}
-      """.stripMargin.trim
-  }
-
-  private def writeOrError(writer: Writer)(f: Writer => Unit): Unit = {
-    try f(writer)
-    catch {
-      case e: AnalysisException => writer.write(e.toString)
-    }
+    val concat = new StringConcat()
+    concat.append("== Physical Plan ==\n")
+    QueryPlan.append(executedPlan, concat.append, verbose = false, addSuffix = false)
+    concat.append("\n")
+    concat.toString
   }
 
-  private def writePlans(writer: Writer, maxFields: Int): Unit = {
+  private def writePlans(append: String => Unit, maxFields: Int): Unit = {
     val (verbose, addSuffix) = (true, false)
-
-    writer.write("== Parsed Logical Plan ==\n")
-    writeOrError(writer)(logical.treeString(_, verbose, addSuffix, maxFields))
-    writer.write("\n== Analyzed Logical Plan ==\n")
-    val analyzedOutput = stringOrError(truncatedString(
-      analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ", maxFields))
-    writer.write(analyzedOutput)
-    writer.write("\n")
-    writeOrError(writer)(analyzed.treeString(_, verbose, addSuffix, maxFields))
-    writer.write("\n== Optimized Logical Plan ==\n")
-    writeOrError(writer)(optimizedPlan.treeString(_, verbose, addSuffix, maxFields))
-    writer.write("\n== Physical Plan ==\n")
-    writeOrError(writer)(executedPlan.treeString(_, verbose, addSuffix, maxFields))
+    append("== Parsed Logical Plan ==\n")
+    QueryPlan.append(logical, append, verbose, addSuffix, maxFields)
+    append("\n== Analyzed Logical Plan ==\n")
+    val analyzedOutput = try {
+      truncatedString(
+        analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ", maxFields)
+    } catch {
+      case e: AnalysisException => e.toString
+    }
+    append(analyzedOutput)
+    append("\n")
+    QueryPlan.append(analyzed, append, verbose, addSuffix, maxFields)
+    append("\n== Optimized Logical Plan ==\n")
+    QueryPlan.append(optimizedPlan, append, verbose, addSuffix, maxFields)
+    append("\n== Physical Plan ==\n")
+    QueryPlan.append(executedPlan, append, verbose, addSuffix, maxFields)
   }
 
   override def toString: String = withRedaction {
-    val writer = new StringBuilderWriter()
-    try {
-      writePlans(writer, SQLConf.get.maxToStringFields)
-      writer.toString
-    } finally {
-      writer.close()
-    }
+    val concat = new StringConcat()
+    writePlans(concat.append, SQLConf.get.maxToStringFields)
+    concat.toString
   }
 
   def stringWithStats: String = withRedaction {
+    val concat = new StringConcat()
+    val maxFields = SQLConf.get.maxToStringFields
+
     // trigger to compute stats for logical plans
     optimizedPlan.stats
 
     // only show optimized logical plan and physical plan
-    s"""== Optimized Logical Plan ==
-        |${stringOrError(optimizedPlan.treeString(verbose = true, addSuffix = true))}
-        |== Physical Plan ==
-        |${stringOrError(executedPlan.treeString(verbose = true))}
-    """.stripMargin.trim
+    concat.append("== Optimized Logical Plan ==\n")
+    QueryPlan.append(optimizedPlan, concat.append, verbose = true, addSuffix = true, maxFields)
+    concat.append("\n== Physical Plan ==\n")
+    QueryPlan.append(executedPlan, concat.append, verbose = true, addSuffix = false, maxFields)
+    concat.append("\n")
+    concat.toString
   }
 
   /**
@@ -282,7 +277,7 @@ class QueryExecution(
     /**
      * Dumps debug information about query execution into the specified file.
      *
-     * @param maxFields maximim number of fields converted to string representation.
+     * @param maxFields maximum number of fields converted to string representation.
      */
     def toFile(path: String, maxFields: Int = Int.MaxValue): Unit = {
       val filePath = new Path(path)
@@ -290,9 +285,9 @@ class QueryExecution(
       val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath)))
 
       try {
-        writePlans(writer, maxFields)
+        writePlans(writer.write, maxFields)
         writer.write("\n== Whole Stage Codegen ==\n")
-        org.apache.spark.sql.execution.debug.writeCodegen(writer, executedPlan)
+        org.apache.spark.sql.execution.debug.writeCodegen(writer.write, executedPlan)
       } finally {
         writer.close()
       }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index f4927dedabe56..3b0a99669ccd0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -493,7 +493,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod
   override def generateTreeString(
       depth: Int,
       lastChildren: Seq[Boolean],
-      writer: Writer,
+      append: String => Unit,
       verbose: Boolean,
       prefix: String = "",
       addSuffix: Boolean = false,
@@ -501,7 +501,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod
     child.generateTreeString(
       depth,
       lastChildren,
-      writer,
+      append,
       verbose,
       prefix = "",
       addSuffix = false,
@@ -777,7 +777,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
   override def generateTreeString(
       depth: Int,
       lastChildren: Seq[Boolean],
-      writer: Writer,
+      append: String => Unit,
       verbose: Boolean,
       prefix: String = "",
       addSuffix: Boolean = false,
@@ -785,7 +785,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
     child.generateTreeString(
       depth,
       lastChildren,
-      writer,
+      append,
       verbose,
       s"*($codegenStageId) ",
       false,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index ae8197f617a28..53b74c7c85594 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -17,13 +17,10 @@
 
 package org.apache.spark.sql.execution
 
-import java.io.Writer
 import java.util.Collections
 
 import scala.collection.JavaConverters._
 
-import org.apache.commons.io.output.StringBuilderWriter
-
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
@@ -32,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodegenContext, ExprCode}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.catalyst.trees.TreeNodeRef
+import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
 import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.StreamingQuery
@@ -73,24 +71,19 @@ package object debug {
    * @return single String containing all WholeStageCodegen subtrees and corresponding codegen
    */
   def codegenString(plan: SparkPlan): String = {
-    val writer = new StringBuilderWriter()
-
-    try {
-      writeCodegen(writer, plan)
-      writer.toString
-    } finally {
-      writer.close()
-    }
+    val concat = new StringConcat()
+    writeCodegen(concat.append, plan)
+    concat.toString
   }
 
-  def writeCodegen(writer: Writer, plan: SparkPlan): Unit = {
+  def writeCodegen(append: String => Unit, plan: SparkPlan): Unit = {
     val codegenSeq = codegenStringSeq(plan)
-    writer.write(s"Found ${codegenSeq.size} WholeStageCodegen subtrees.\n")
+    append(s"Found ${codegenSeq.size} WholeStageCodegen subtrees.\n")
     for (((subtree, code), i) <- codegenSeq.zipWithIndex) {
-      writer.write(s"== Subtree ${i + 1} / ${codegenSeq.size} ==\n")
-      writer.write(subtree)
-      writer.write("\nGenerated code:\n")
-      writer.write(s"${code}\n")
+      append(s"== Subtree ${i + 1} / ${codegenSeq.size} ==\n")
+      append(subtree)
+      append("\nGenerated code:\n")
+      append(s"${code}\n")
     }
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org