You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/03/13 16:59:22 UTC
[spark] branch master updated: [SPARK-26103][SQL] Limit the length
of debug strings for query plans
This is an automated email from the ASF dual-hosted git repository.
vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 812ad55 [SPARK-26103][SQL] Limit the length of debug strings for query plans
812ad55 is described below
commit 812ad5546148d2194ab0e4230ee85b8f6a5be2fb
Author: Dave DeCaprio <da...@alum.mit.edu>
AuthorDate: Wed Mar 13 09:58:43 2019 -0700
[SPARK-26103][SQL] Limit the length of debug strings for query plans
## What changes were proposed in this pull request?
The PR puts in a limit on the size of a debug string generated for a tree node. Helps to fix out of memory errors when large plans have huge debug strings. In addition to SPARK-26103, this should also address SPARK-23904 and SPARK-25380. AN alternative solution was proposed in #23076, but that solution doesn't address all the cases that can cause a large query. This limit is only on calls treeString that don't pass a Writer, which makes it play nicely with #22429, #23018 and #230 [...]
- A new configuration parameter called spark.sql.debug.maxPlanLength was added to control the length of the plans.
- When plans are truncated, "..." is printed to indicate that it isn't a full plan
- A warning is printed out the first time a truncated plan is displayed. The warning explains what happened and how to adjust the limit.
## How was this patch tested?
Unit tests were created for the new SizeLimitedWriter. Also a unit test for TreeNode was created that checks that a long plan is correctly truncated.
Closes #23169 from DaveDeCaprio/text-plan-size.
Lead-authored-by: Dave DeCaprio <da...@alum.mit.edu>
Co-authored-by: David DeCaprio <da...@alum.mit.edu>
Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
.../apache/spark/sql/catalyst/trees/TreeNode.scala | 4 +-
.../spark/sql/catalyst/util/StringUtils.scala | 58 ++++++++++++++++++----
.../org/apache/spark/sql/internal/SQLConf.scala | 13 +++++
.../spark/sql/catalyst/trees/TreeNodeSuite.scala | 29 ++++++++++-
.../spark/sql/catalyst/util/StringUtilsSuite.scala | 33 +++++++++---
.../spark/sql/execution/QueryExecution.scala | 14 +++---
6 files changed, 126 insertions(+), 25 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index d214ebb..72b1931 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
-import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
+import org.apache.spark.sql.catalyst.util.StringUtils.{PlanStringConcat, StringConcat}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -480,7 +480,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
verbose: Boolean,
addSuffix: Boolean = false,
maxFields: Int = SQLConf.get.maxToStringFields): String = {
- val concat = new StringConcat()
+ val concat = new PlanStringConcat()
treeString(concat.append, verbose, addSuffix, maxFields)
concat.toString
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
index 643b83b..6118d8c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
@@ -17,14 +17,18 @@
package org.apache.spark.sql.catalyst.util
+import java.util.concurrent.atomic.AtomicBoolean
import java.util.regex.{Pattern, PatternSyntaxException}
import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.unsafe.types.UTF8String
-object StringUtils {
+object StringUtils extends Logging {
/**
* Validate and convert SQL 'like' pattern to a Java regular expression.
@@ -92,20 +96,29 @@ object StringUtils {
/**
* Concatenation of sequence of strings to final string with cheap append method
- * and one memory allocation for the final string.
+ * and one memory allocation for the final string. Can also bound the final size of
+ * the string.
*/
- class StringConcat {
- private val strings = new ArrayBuffer[String]
- private var length: Int = 0
+ class StringConcat(val maxLength: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+ protected val strings = new ArrayBuffer[String]
+ protected var length: Int = 0
+
+ def atLimit: Boolean = length >= maxLength
/**
* Appends a string and accumulates its length to allocate a string buffer for all
- * appended strings once in the toString method.
+ * appended strings once in the toString method. Returns true if the string still
+ * has room for further appends before it hits its max limit.
*/
def append(s: String): Unit = {
if (s != null) {
- strings.append(s)
- length += s.length
+ val sLen = s.length
+ if (!atLimit) {
+ val available = maxLength - length
+ val stringToAppend = if (available >= sLen) s else s.substring(0, available)
+ strings.append(stringToAppend)
+ }
+ length += sLen
}
}
@@ -114,9 +127,36 @@ object StringUtils {
* returns concatenated string.
*/
override def toString: String = {
- val result = new java.lang.StringBuilder(length)
+ val finalLength = if (atLimit) maxLength else length
+ val result = new java.lang.StringBuilder(finalLength)
strings.foreach(result.append)
result.toString
}
}
+
+ /**
+ * A string concatenator for plan strings. Uses length from a configured value, and
+ * prints a warning the first time a plan is truncated.
+ */
+ class PlanStringConcat extends StringConcat(Math.max(0, SQLConf.get.maxPlanStringLength - 30)) {
+ override def toString: String = {
+ if (atLimit) {
+ logWarning(
+ "Truncated the string representation of a plan since it was too long. The " +
+ s"plan had length ${length} and the maximum is ${maxLength}. This behavior " +
+ "can be adjusted by setting '${SQLConf.MAX_PLAN_STRING_LENGTH.key}'.")
+ val truncateMsg = if (maxLength == 0) {
+ s"Truncated plan of $length characters"
+ } else {
+ s"... ${length - maxLength} more characters"
+ }
+ val result = new java.lang.StringBuilder(maxLength + truncateMsg.length)
+ strings.foreach(result.append)
+ result.append(truncateMsg)
+ result.toString
+ } else {
+ super.toString
+ }
+ }
+ }
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 193d311..20f4080 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1691,6 +1691,17 @@ object SQLConf {
.intConf
.createWithDefault(25)
+ val MAX_PLAN_STRING_LENGTH = buildConf("spark.sql.maxPlanStringLength")
+ .doc("Maximum number of characters to output for a plan string. If the plan is " +
+ "longer, further output will be truncated. The default setting always generates a full " +
+ "plan. Set this to a lower value such as 8k if plan strings are taking up too much " +
+ "memory or are causing OutOfMemory errors in the driver or UI processes.")
+ .bytesConf(ByteUnit.BYTE)
+ .checkValue(i => i >= 0 && i <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, "Invalid " +
+ "value for 'spark.sql.maxPlanStringLength'. Length must be a valid string length " +
+ "(nonnegative and shorter than the maximum size).")
+ .createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)
+
val SET_COMMAND_REJECTS_SPARK_CORE_CONFS =
buildConf("spark.sql.legacy.setCommandRejectsSparkCoreConfs")
.internal()
@@ -2146,6 +2157,8 @@ class SQLConf extends Serializable with Logging {
def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS)
+ def maxPlanStringLength: Int = getConf(SQLConf.MAX_PLAN_STRING_LENGTH).toInt
+
def setCommandRejectsSparkCoreConfs: Boolean =
getConf(SQLConf.SET_COMMAND_REJECTS_SPARK_CORE_CONFS)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
index cb911d7..e7ad04f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
@@ -33,9 +33,10 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.dsl.expressions.DslString
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.catalyst.plans.{LeftOuter, NaturalJoin}
+import org.apache.spark.sql.catalyst.plans.{LeftOuter, NaturalJoin, SQLHelper}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Union}
import org.apache.spark.sql.catalyst.plans.physical.{IdentityBroadcastMode, RoundRobinPartitioning, SinglePartition}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
@@ -81,7 +82,7 @@ case class SelfReferenceUDF(
def apply(key: String): Boolean = config.contains(key)
}
-class TreeNodeSuite extends SparkFunSuite {
+class TreeNodeSuite extends SparkFunSuite with SQLHelper {
test("top node changed") {
val after = Literal(1) transform { case Literal(1, _) => Literal(2) }
assert(after === Literal(2))
@@ -595,4 +596,28 @@ class TreeNodeSuite extends SparkFunSuite {
val expected = Coalesce(Stream(Literal(1), Literal(3)))
assert(result === expected)
}
+
+ test("treeString limits plan length") {
+ withSQLConf(SQLConf.MAX_PLAN_STRING_LENGTH.key -> "200") {
+ val ds = (1 until 20).foldLeft(Literal("TestLiteral"): Expression) { case (treeNode, x) =>
+ Add(Literal(x), treeNode)
+ }
+
+ val planString = ds.treeString
+ logWarning("Plan string: " + planString)
+ assert(planString.endsWith(" more characters"))
+ assert(planString.length <= SQLConf.get.maxPlanStringLength)
+ }
+ }
+
+ test("treeString limit at zero") {
+ withSQLConf(SQLConf.MAX_PLAN_STRING_LENGTH.key -> "0") {
+ val ds = (1 until 2).foldLeft(Literal("TestLiteral"): Expression) { case (treeNode, x) =>
+ Add(Literal(x), treeNode)
+ }
+
+ val planString = ds.treeString
+ assert(planString.startsWith("Truncated plan of"))
+ }
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala
index 616ec12..63d3831 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala
@@ -46,14 +46,35 @@ class StringUtilsSuite extends SparkFunSuite {
test("string concatenation") {
def concat(seq: String*): String = {
- seq.foldLeft(new StringConcat())((acc, s) => {acc.append(s); acc}).toString
+ seq.foldLeft(new StringConcat()) { (acc, s) => acc.append(s); acc }.toString
}
assert(new StringConcat().toString == "")
- assert(concat("") == "")
- assert(concat(null) == "")
- assert(concat("a") == "a")
- assert(concat("1", "2") == "12")
- assert(concat("abc", "\n", "123") == "abc\n123")
+ assert(concat("") === "")
+ assert(concat(null) === "")
+ assert(concat("a") === "a")
+ assert(concat("1", "2") === "12")
+ assert(concat("abc", "\n", "123") === "abc\n123")
+ }
+
+ test("string concatenation with limit") {
+ def concat(seq: String*): String = {
+ seq.foldLeft(new StringConcat(7)) { (acc, s) => acc.append(s); acc }.toString
+ }
+ assert(concat("under") === "under")
+ assert(concat("under", "over", "extra") === "underov")
+ assert(concat("underover") === "underov")
+ assert(concat("under", "ov") === "underov")
+ }
+
+ test("string concatenation return value") {
+ def checkLimit(s: String): Boolean = {
+ val sc = new StringConcat(7)
+ sc.append(s)
+ sc.atLimit
+ }
+ assert(!checkLimit("under"))
+ assert(checkLimit("1234567"))
+ assert(checkLimit("1234567890"))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 49d6acf..5d2710bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
+import org.apache.spark.sql.catalyst.util.StringUtils.{PlanStringConcat, StringConcat}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.internal.SQLConf
@@ -114,7 +114,7 @@ class QueryExecution(
ReuseSubquery(sparkSession.sessionState.conf))
def simpleString: String = withRedaction {
- val concat = new StringConcat()
+ val concat = new PlanStringConcat()
concat.append("== Physical Plan ==\n")
QueryPlan.append(executedPlan, concat.append, verbose = false, addSuffix = false)
concat.append("\n")
@@ -142,13 +142,13 @@ class QueryExecution(
}
override def toString: String = withRedaction {
- val concat = new StringConcat()
+ val concat = new PlanStringConcat()
writePlans(concat.append, SQLConf.get.maxToStringFields)
concat.toString
}
def stringWithStats: String = withRedaction {
- val concat = new StringConcat()
+ val concat = new PlanStringConcat()
val maxFields = SQLConf.get.maxToStringFields
// trigger to compute stats for logical plans
@@ -203,9 +203,11 @@ class QueryExecution(
val filePath = new Path(path)
val fs = filePath.getFileSystem(sparkSession.sessionState.newHadoopConf())
val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath)))
-
+ val append = (s: String) => {
+ writer.write(s)
+ }
try {
- writePlans(writer.write, maxFields)
+ writePlans(append, maxFields)
writer.write("\n== Whole Stage Codegen ==\n")
org.apache.spark.sql.execution.debug.writeCodegen(writer.write, executedPlan)
} finally {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org