You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2018/12/31 15:43:10 UTC
[GitHub] asfgit closed pull request #23406: [SPARK-26504][SQL] Rope-wise
dumping of Spark plans
asfgit closed pull request #23406: [SPARK-26504][SQL] Rope-wise dumping of Spark plans
URL: https://github.com/apache/spark/pull/23406
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 125181fb213f8..8f5444ed8a5a7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.plans
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode}
import org.apache.spark.sql.internal.SQLConf
@@ -301,4 +302,20 @@ object QueryPlan extends PredicateHelper {
Nil
}
}
+
+ /**
+ * Converts the query plan to string and appends it via provided function.
+ */
+ def append[T <: QueryPlan[T]](
+ plan: => QueryPlan[T],
+ append: String => Unit,
+ verbose: Boolean,
+ addSuffix: Boolean,
+ maxFields: Int = SQLConf.get.maxToStringFields): Unit = {
+ try {
+ plan.treeString(append, verbose, addSuffix, maxFields)
+ } catch {
+ case e: AnalysisException => append(e.toString)
+ }
+ }
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 21e59bbd283e4..570a019b2af77 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -17,13 +17,11 @@
package org.apache.spark.sql.catalyst.trees
-import java.io.Writer
import java.util.UUID
import scala.collection.Map
import scala.reflect.ClassTag
-import org.apache.commons.io.output.StringBuilderWriter
import org.apache.commons.lang3.ClassUtils
import org.json4s.JsonAST._
import org.json4s.JsonDSL._
@@ -37,6 +35,7 @@ import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
+import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -481,21 +480,18 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
verbose: Boolean,
addSuffix: Boolean = false,
maxFields: Int = SQLConf.get.maxToStringFields): String = {
- val writer = new StringBuilderWriter()
- try {
- treeString(writer, verbose, addSuffix, maxFields)
- writer.toString
- } finally {
- writer.close()
- }
+ val concat = new StringConcat()
+
+ treeString(concat.append, verbose, addSuffix, maxFields)
+ concat.toString
}
def treeString(
- writer: Writer,
+ append: String => Unit,
verbose: Boolean,
addSuffix: Boolean,
maxFields: Int): Unit = {
- generateTreeString(0, Nil, writer, verbose, "", addSuffix, maxFields)
+ generateTreeString(0, Nil, append, verbose, "", addSuffix, maxFields)
}
/**
@@ -558,7 +554,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
def generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
- writer: Writer,
+ append: String => Unit,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false,
@@ -566,9 +562,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
if (depth > 0) {
lastChildren.init.foreach { isLast =>
- writer.write(if (isLast) " " else ": ")
+ append(if (isLast) " " else ": ")
}
- writer.write(if (lastChildren.last) "+- " else ":- ")
+ append(if (lastChildren.last) "+- " else ":- ")
}
val str = if (verbose) {
@@ -576,24 +572,24 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
} else {
simpleString(maxFields)
}
- writer.write(prefix)
- writer.write(str)
- writer.write("\n")
+ append(prefix)
+ append(str)
+ append("\n")
if (innerChildren.nonEmpty) {
innerChildren.init.foreach(_.generateTreeString(
- depth + 2, lastChildren :+ children.isEmpty :+ false, writer, verbose,
+ depth + 2, lastChildren :+ children.isEmpty :+ false, append, verbose,
addSuffix = addSuffix, maxFields = maxFields))
innerChildren.last.generateTreeString(
- depth + 2, lastChildren :+ children.isEmpty :+ true, writer, verbose,
+ depth + 2, lastChildren :+ children.isEmpty :+ true, append, verbose,
addSuffix = addSuffix, maxFields = maxFields)
}
if (children.nonEmpty) {
children.init.foreach(_.generateTreeString(
- depth + 1, lastChildren :+ false, writer, verbose, prefix, addSuffix, maxFields))
+ depth + 1, lastChildren :+ false, append, verbose, prefix, addSuffix, maxFields))
children.last.generateTreeString(
- depth + 1, lastChildren :+ true, writer, verbose, prefix, addSuffix, maxFields)
+ depth + 1, lastChildren :+ true, append, verbose, prefix, addSuffix, maxFields)
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
index bc861a805ce61..643b83b1741ae 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.util
import java.util.regex.{Pattern, PatternSyntaxException}
+import scala.collection.mutable.ArrayBuffer
+
import org.apache.spark.sql.AnalysisException
import org.apache.spark.unsafe.types.UTF8String
@@ -87,4 +89,34 @@ object StringUtils {
}
funcNames.toSeq
}
+
+ /**
+ * Concatenation of sequence of strings to final string with cheap append method
+ * and one memory allocation for the final string.
+ */
+ class StringConcat {
+ private val strings = new ArrayBuffer[String]
+ private var length: Int = 0
+
+ /**
+ * Appends a string and accumulates its length to allocate a string buffer for all
+ * appended strings once in the toString method.
+ */
+ def append(s: String): Unit = {
+ if (s != null) {
+ strings.append(s)
+ length += s.length
+ }
+ }
+
+ /**
+ * The method allocates memory for all appended strings, writes them to the memory and
+ * returns concatenated string.
+ */
+ override def toString: String = {
+ val result = new java.lang.StringBuilder(length)
+ strings.foreach(result.append)
+ result.toString
+ }
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala
index 78fee5135c3ae..616ec12032dbd 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala
@@ -43,4 +43,17 @@ class StringUtilsSuite extends SparkFunSuite {
assert(filterPattern(names, " a. ") === Seq("a1", "a2"))
assert(filterPattern(names, " d* ") === Nil)
}
+
+ test("string concatenation") {
+ def concat(seq: String*): String = {
+ seq.foldLeft(new StringConcat())((acc, s) => {acc.append(s); acc}).toString
+ }
+
+ assert(new StringConcat().toString == "")
+ assert(concat("") == "")
+ assert(concat(null) == "")
+ assert(concat("a") == "a")
+ assert(concat("1", "2") == "12")
+ assert(concat("abc", "\n", "123") == "abc\n123")
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 9b8d2e830867d..7fccbf65d8525 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -17,20 +17,21 @@
package org.apache.spark.sql.execution
-import java.io.{BufferedWriter, OutputStreamWriter, Writer}
+import java.io.{BufferedWriter, OutputStreamWriter}
import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
-import org.apache.commons.io.output.StringBuilderWriter
import org.apache.hadoop.fs.Path
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker}
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
+import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand}
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
@@ -108,10 +109,6 @@ class QueryExecution(
ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf))
- protected def stringOrError[A](f: => A): String =
- try f.toString catch { case e: AnalysisException => e.toString }
-
-
/**
* Returns the result as a hive compatible sequence of strings. This is used in tests and
* `SparkSQLDriver` for CLI applications.
@@ -197,55 +194,53 @@ class QueryExecution(
}
def simpleString: String = withRedaction {
- s"""== Physical Plan ==
- |${stringOrError(executedPlan.treeString(verbose = false))}
- """.stripMargin.trim
- }
-
- private def writeOrError(writer: Writer)(f: Writer => Unit): Unit = {
- try f(writer)
- catch {
- case e: AnalysisException => writer.write(e.toString)
- }
+ val concat = new StringConcat()
+ concat.append("== Physical Plan ==\n")
+ QueryPlan.append(executedPlan, concat.append, verbose = false, addSuffix = false)
+ concat.append("\n")
+ concat.toString
}
- private def writePlans(writer: Writer, maxFields: Int): Unit = {
+ private def writePlans(append: String => Unit, maxFields: Int): Unit = {
val (verbose, addSuffix) = (true, false)
-
- writer.write("== Parsed Logical Plan ==\n")
- writeOrError(writer)(logical.treeString(_, verbose, addSuffix, maxFields))
- writer.write("\n== Analyzed Logical Plan ==\n")
- val analyzedOutput = stringOrError(truncatedString(
- analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ", maxFields))
- writer.write(analyzedOutput)
- writer.write("\n")
- writeOrError(writer)(analyzed.treeString(_, verbose, addSuffix, maxFields))
- writer.write("\n== Optimized Logical Plan ==\n")
- writeOrError(writer)(optimizedPlan.treeString(_, verbose, addSuffix, maxFields))
- writer.write("\n== Physical Plan ==\n")
- writeOrError(writer)(executedPlan.treeString(_, verbose, addSuffix, maxFields))
+ append("== Parsed Logical Plan ==\n")
+ QueryPlan.append(logical, append, verbose, addSuffix, maxFields)
+ append("\n== Analyzed Logical Plan ==\n")
+ val analyzedOutput = try {
+ truncatedString(
+ analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ", maxFields)
+ } catch {
+ case e: AnalysisException => e.toString
+ }
+ append(analyzedOutput)
+ append("\n")
+ QueryPlan.append(analyzed, append, verbose, addSuffix, maxFields)
+ append("\n== Optimized Logical Plan ==\n")
+ QueryPlan.append(optimizedPlan, append, verbose, addSuffix, maxFields)
+ append("\n== Physical Plan ==\n")
+ QueryPlan.append(executedPlan, append, verbose, addSuffix, maxFields)
}
override def toString: String = withRedaction {
- val writer = new StringBuilderWriter()
- try {
- writePlans(writer, SQLConf.get.maxToStringFields)
- writer.toString
- } finally {
- writer.close()
- }
+ val concat = new StringConcat()
+ writePlans(concat.append, SQLConf.get.maxToStringFields)
+ concat.toString
}
def stringWithStats: String = withRedaction {
+ val concat = new StringConcat()
+ val maxFields = SQLConf.get.maxToStringFields
+
// trigger to compute stats for logical plans
optimizedPlan.stats
// only show optimized logical plan and physical plan
- s"""== Optimized Logical Plan ==
- |${stringOrError(optimizedPlan.treeString(verbose = true, addSuffix = true))}
- |== Physical Plan ==
- |${stringOrError(executedPlan.treeString(verbose = true))}
- """.stripMargin.trim
+ concat.append("== Optimized Logical Plan ==\n")
+ QueryPlan.append(optimizedPlan, concat.append, verbose = true, addSuffix = true, maxFields)
+ concat.append("\n== Physical Plan ==\n")
+ QueryPlan.append(executedPlan, concat.append, verbose = true, addSuffix = false, maxFields)
+ concat.append("\n")
+ concat.toString
}
/**
@@ -282,7 +277,7 @@ class QueryExecution(
/**
* Dumps debug information about query execution into the specified file.
*
- * @param maxFields maximim number of fields converted to string representation.
+ * @param maxFields maximum number of fields converted to string representation.
*/
def toFile(path: String, maxFields: Int = Int.MaxValue): Unit = {
val filePath = new Path(path)
@@ -290,9 +285,9 @@ class QueryExecution(
val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath)))
try {
- writePlans(writer, maxFields)
+ writePlans(writer.write, maxFields)
writer.write("\n== Whole Stage Codegen ==\n")
- org.apache.spark.sql.execution.debug.writeCodegen(writer, executedPlan)
+ org.apache.spark.sql.execution.debug.writeCodegen(writer.write, executedPlan)
} finally {
writer.close()
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index f4927dedabe56..3b0a99669ccd0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -493,7 +493,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod
override def generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
- writer: Writer,
+ append: String => Unit,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false,
@@ -501,7 +501,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod
child.generateTreeString(
depth,
lastChildren,
- writer,
+ append,
verbose,
prefix = "",
addSuffix = false,
@@ -777,7 +777,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
override def generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
- writer: Writer,
+ append: String => Unit,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false,
@@ -785,7 +785,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
child.generateTreeString(
depth,
lastChildren,
- writer,
+ append,
verbose,
s"*($codegenStageId) ",
false,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index ae8197f617a28..53b74c7c85594 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -17,13 +17,10 @@
package org.apache.spark.sql.execution
-import java.io.Writer
import java.util.Collections
import scala.collection.JavaConverters._
-import org.apache.commons.io.output.StringBuilderWriter
-
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
@@ -32,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
+import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamingQuery
@@ -73,24 +71,19 @@ package object debug {
* @return single String containing all WholeStageCodegen subtrees and corresponding codegen
*/
def codegenString(plan: SparkPlan): String = {
- val writer = new StringBuilderWriter()
-
- try {
- writeCodegen(writer, plan)
- writer.toString
- } finally {
- writer.close()
- }
+ val concat = new StringConcat()
+ writeCodegen(concat.append, plan)
+ concat.toString
}
- def writeCodegen(writer: Writer, plan: SparkPlan): Unit = {
+ def writeCodegen(append: String => Unit, plan: SparkPlan): Unit = {
val codegenSeq = codegenStringSeq(plan)
- writer.write(s"Found ${codegenSeq.size} WholeStageCodegen subtrees.\n")
+ append(s"Found ${codegenSeq.size} WholeStageCodegen subtrees.\n")
for (((subtree, code), i) <- codegenSeq.zipWithIndex) {
- writer.write(s"== Subtree ${i + 1} / ${codegenSeq.size} ==\n")
- writer.write(subtree)
- writer.write("\nGenerated code:\n")
- writer.write(s"${code}\n")
+ append(s"== Subtree ${i + 1} / ${codegenSeq.size} ==\n")
+ append(subtree)
+ append("\nGenerated code:\n")
+ append(s"${code}\n")
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org