You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/03/13 16:59:22 UTC

[spark] branch master updated: [SPARK-26103][SQL] Limit the length of debug strings for query plans

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 812ad55  [SPARK-26103][SQL] Limit the length of debug strings for query plans
812ad55 is described below

commit 812ad5546148d2194ab0e4230ee85b8f6a5be2fb
Author: Dave DeCaprio <da...@alum.mit.edu>
AuthorDate: Wed Mar 13 09:58:43 2019 -0700

    [SPARK-26103][SQL] Limit the length of debug strings for query plans
    
    ## What changes were proposed in this pull request?
    
    The PR puts in a limit on the size of a debug string generated for a tree node.  Helps to fix out of memory errors when large plans have huge debug strings.   In addition to SPARK-26103, this should also address SPARK-23904 and SPARK-25380.  AN alternative solution was proposed in #23076, but that solution doesn't address all the cases that can cause a large query.  This limit is only on calls treeString that don't pass a Writer, which makes it play nicely with #22429, #23018 and #230 [...]
    
    - A new configuration parameter called spark.sql.debug.maxPlanLength was added to control the length of the plans.
    - When plans are truncated, "..." is printed to indicate that it isn't a full plan
    - A warning is printed out the first time a truncated plan is displayed. The warning explains what happened and how to adjust the limit.
    
    ## How was this patch tested?
    
    Unit tests were created for the new SizeLimitedWriter.  Also a unit test for TreeNode was created that checks that a long plan is correctly truncated.
    
    Closes #23169 from DaveDeCaprio/text-plan-size.
    
    Lead-authored-by: Dave DeCaprio <da...@alum.mit.edu>
    Co-authored-by: David DeCaprio <da...@alum.mit.edu>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 .../apache/spark/sql/catalyst/trees/TreeNode.scala |  4 +-
 .../spark/sql/catalyst/util/StringUtils.scala      | 58 ++++++++++++++++++----
 .../org/apache/spark/sql/internal/SQLConf.scala    | 13 +++++
 .../spark/sql/catalyst/trees/TreeNodeSuite.scala   | 29 ++++++++++-
 .../spark/sql/catalyst/util/StringUtilsSuite.scala | 33 +++++++++---
 .../spark/sql/execution/QueryExecution.scala       | 14 +++---
 6 files changed, 126 insertions(+), 25 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index d214ebb..72b1931 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
-import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
+import org.apache.spark.sql.catalyst.util.StringUtils.{PlanStringConcat, StringConcat}
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -480,7 +480,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
       verbose: Boolean,
       addSuffix: Boolean = false,
       maxFields: Int = SQLConf.get.maxToStringFields): String = {
-    val concat = new StringConcat()
+    val concat = new PlanStringConcat()
 
     treeString(concat.append, verbose, addSuffix, maxFields)
     concat.toString
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
index 643b83b..6118d8c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
@@ -17,14 +17,18 @@
 
 package org.apache.spark.sql.catalyst.util
 
+import java.util.concurrent.atomic.AtomicBoolean
 import java.util.regex.{Pattern, PatternSyntaxException}
 
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.unsafe.array.ByteArrayMethods
 import org.apache.spark.unsafe.types.UTF8String
 
-object StringUtils {
+object StringUtils extends Logging {
 
   /**
    * Validate and convert SQL 'like' pattern to a Java regular expression.
@@ -92,20 +96,29 @@ object StringUtils {
 
   /**
    * Concatenation of sequence of strings to final string with cheap append method
-   * and one memory allocation for the final string.
+   * and one memory allocation for the final string.  Can also bound the final size of
+   * the string.
    */
-  class StringConcat {
-    private val strings = new ArrayBuffer[String]
-    private var length: Int = 0
+  class StringConcat(val maxLength: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+    protected val strings = new ArrayBuffer[String]
+    protected var length: Int = 0
+
+    def atLimit: Boolean = length >= maxLength
 
     /**
      * Appends a string and accumulates its length to allocate a string buffer for all
-     * appended strings once in the toString method.
+     * appended strings once in the toString method.  Returns true if the string still
+     * has room for further appends before it hits its max limit.
      */
     def append(s: String): Unit = {
       if (s != null) {
-        strings.append(s)
-        length += s.length
+        val sLen = s.length
+        if (!atLimit) {
+          val available = maxLength - length
+          val stringToAppend = if (available >= sLen) s else s.substring(0, available)
+          strings.append(stringToAppend)
+        }
+        length += sLen
       }
     }
 
@@ -114,9 +127,36 @@ object StringUtils {
      * returns concatenated string.
      */
     override def toString: String = {
-      val result = new java.lang.StringBuilder(length)
+      val finalLength = if (atLimit) maxLength else length
+      val result = new java.lang.StringBuilder(finalLength)
       strings.foreach(result.append)
       result.toString
     }
   }
+
+  /**
+   * A string concatenator for plan strings.  Uses length from a configured value, and
+   *  prints a warning the first time a plan is truncated.
+   */
+  class PlanStringConcat extends StringConcat(Math.max(0, SQLConf.get.maxPlanStringLength - 30)) {
+    override def toString: String = {
+      if (atLimit) {
+        logWarning(
+          "Truncated the string representation of a plan since it was too long. The " +
+            s"plan had length ${length} and the maximum is ${maxLength}. This behavior " +
+            "can be adjusted by setting '${SQLConf.MAX_PLAN_STRING_LENGTH.key}'.")
+        val truncateMsg = if (maxLength == 0) {
+          s"Truncated plan of $length characters"
+        } else {
+          s"... ${length - maxLength} more characters"
+        }
+        val result = new java.lang.StringBuilder(maxLength + truncateMsg.length)
+        strings.foreach(result.append)
+        result.append(truncateMsg)
+        result.toString
+      } else {
+        super.toString
+      }
+    }
+  }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 193d311..20f4080 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1691,6 +1691,17 @@ object SQLConf {
     .intConf
     .createWithDefault(25)
 
+  val MAX_PLAN_STRING_LENGTH = buildConf("spark.sql.maxPlanStringLength")
+    .doc("Maximum number of characters to output for a plan string.  If the plan is " +
+      "longer, further output will be truncated.  The default setting always generates a full " +
+      "plan.  Set this to a lower value such as 8k if plan strings are taking up too much " +
+      "memory or are causing OutOfMemory errors in the driver or UI processes.")
+    .bytesConf(ByteUnit.BYTE)
+    .checkValue(i => i >= 0 && i <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, "Invalid " +
+      "value for 'spark.sql.maxPlanStringLength'.  Length must be a valid string length " +
+      "(nonnegative and shorter than the maximum size).")
+    .createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)
+
   val SET_COMMAND_REJECTS_SPARK_CORE_CONFS =
     buildConf("spark.sql.legacy.setCommandRejectsSparkCoreConfs")
       .internal()
@@ -2146,6 +2157,8 @@ class SQLConf extends Serializable with Logging {
 
   def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS)
 
+  def maxPlanStringLength: Int = getConf(SQLConf.MAX_PLAN_STRING_LENGTH).toInt
+
   def setCommandRejectsSparkCoreConfs: Boolean =
     getConf(SQLConf.SET_COMMAND_REJECTS_SPARK_CORE_CONFS)
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
index cb911d7..e7ad04f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
@@ -33,9 +33,10 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.dsl.expressions.DslString
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.catalyst.plans.{LeftOuter, NaturalJoin}
+import org.apache.spark.sql.catalyst.plans.{LeftOuter, NaturalJoin, SQLHelper}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Union}
 import org.apache.spark.sql.catalyst.plans.physical.{IdentityBroadcastMode, RoundRobinPartitioning, SinglePartition}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.storage.StorageLevel
 
@@ -81,7 +82,7 @@ case class SelfReferenceUDF(
   def apply(key: String): Boolean = config.contains(key)
 }
 
-class TreeNodeSuite extends SparkFunSuite {
+class TreeNodeSuite extends SparkFunSuite with SQLHelper {
   test("top node changed") {
     val after = Literal(1) transform { case Literal(1, _) => Literal(2) }
     assert(after === Literal(2))
@@ -595,4 +596,28 @@ class TreeNodeSuite extends SparkFunSuite {
     val expected = Coalesce(Stream(Literal(1), Literal(3)))
     assert(result === expected)
   }
+
+  test("treeString limits plan length") {
+    withSQLConf(SQLConf.MAX_PLAN_STRING_LENGTH.key -> "200") {
+      val ds = (1 until 20).foldLeft(Literal("TestLiteral"): Expression) { case (treeNode, x) =>
+        Add(Literal(x), treeNode)
+      }
+
+      val planString = ds.treeString
+      logWarning("Plan string: " + planString)
+      assert(planString.endsWith(" more characters"))
+      assert(planString.length <= SQLConf.get.maxPlanStringLength)
+    }
+  }
+
+  test("treeString limit at zero") {
+    withSQLConf(SQLConf.MAX_PLAN_STRING_LENGTH.key -> "0") {
+      val ds = (1 until 2).foldLeft(Literal("TestLiteral"): Expression) { case (treeNode, x) =>
+        Add(Literal(x), treeNode)
+      }
+
+      val planString = ds.treeString
+      assert(planString.startsWith("Truncated plan of"))
+    }
+  }
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala
index 616ec12..63d3831 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala
@@ -46,14 +46,35 @@ class StringUtilsSuite extends SparkFunSuite {
 
   test("string concatenation") {
     def concat(seq: String*): String = {
-      seq.foldLeft(new StringConcat())((acc, s) => {acc.append(s); acc}).toString
+      seq.foldLeft(new StringConcat()) { (acc, s) => acc.append(s); acc }.toString
     }
 
     assert(new StringConcat().toString == "")
-    assert(concat("") == "")
-    assert(concat(null) == "")
-    assert(concat("a") == "a")
-    assert(concat("1", "2") == "12")
-    assert(concat("abc", "\n", "123") == "abc\n123")
+    assert(concat("") === "")
+    assert(concat(null) === "")
+    assert(concat("a") === "a")
+    assert(concat("1", "2") === "12")
+    assert(concat("abc", "\n", "123") === "abc\n123")
+  }
+
+  test("string concatenation with limit") {
+    def concat(seq: String*): String = {
+      seq.foldLeft(new StringConcat(7)) { (acc, s) => acc.append(s); acc }.toString
+    }
+    assert(concat("under") === "under")
+    assert(concat("under", "over", "extra") === "underov")
+    assert(concat("underover") === "underov")
+    assert(concat("under", "ov") === "underov")
+  }
+
+  test("string concatenation return value") {
+    def checkLimit(s: String): Boolean = {
+      val sc = new StringConcat(7)
+      sc.append(s)
+      sc.atLimit
+    }
+    assert(!checkLimit("under"))
+    assert(checkLimit("1234567"))
+    assert(checkLimit("1234567890"))
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 49d6acf..5d2710bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
+import org.apache.spark.sql.catalyst.util.StringUtils.{PlanStringConcat, StringConcat}
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
 import org.apache.spark.sql.internal.SQLConf
@@ -114,7 +114,7 @@ class QueryExecution(
     ReuseSubquery(sparkSession.sessionState.conf))
 
   def simpleString: String = withRedaction {
-    val concat = new StringConcat()
+    val concat = new PlanStringConcat()
     concat.append("== Physical Plan ==\n")
     QueryPlan.append(executedPlan, concat.append, verbose = false, addSuffix = false)
     concat.append("\n")
@@ -142,13 +142,13 @@ class QueryExecution(
   }
 
   override def toString: String = withRedaction {
-    val concat = new StringConcat()
+    val concat = new PlanStringConcat()
     writePlans(concat.append, SQLConf.get.maxToStringFields)
     concat.toString
   }
 
   def stringWithStats: String = withRedaction {
-    val concat = new StringConcat()
+    val concat = new PlanStringConcat()
     val maxFields = SQLConf.get.maxToStringFields
 
     // trigger to compute stats for logical plans
@@ -203,9 +203,11 @@ class QueryExecution(
       val filePath = new Path(path)
       val fs = filePath.getFileSystem(sparkSession.sessionState.newHadoopConf())
       val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath)))
-
+      val append = (s: String) => {
+        writer.write(s)
+      }
       try {
-        writePlans(writer.write, maxFields)
+        writePlans(append, maxFields)
         writer.write("\n== Whole Stage Codegen ==\n")
         org.apache.spark.sql.execution.debug.writeCodegen(writer.write, executedPlan)
       } finally {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org