You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2018/12/27 10:13:40 UTC

[spark] branch master updated: [SPARK-26191][SQL] Control truncation of Spark plans via maxFields parameter

This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a1c1dd3  [SPARK-26191][SQL] Control truncation of Spark plans via maxFields parameter
a1c1dd3 is described below

commit a1c1dd3484a4dcd7c38fe256e69dbaaaf10d1a92
Author: Maxim Gekk <ma...@gmail.com>
AuthorDate: Thu Dec 27 11:13:16 2018 +0100

    [SPARK-26191][SQL] Control truncation of Spark plans via maxFields parameter
    
    ## What changes were proposed in this pull request?
    
    In the PR, I propose to add `maxFields` parameter to all functions involved in creation of textual representation of spark plans such as `simpleString` and `verboseString`. New parameter restricts number of fields converted to truncated strings. Any elements beyond the limit will be dropped and replaced by a `"... N more fields"` placeholder. The threshold is bumped up to `Int.MaxValue` for `toFile()`.
    
    ## How was this patch tested?
    
    Added a test to `QueryExecutionSuite` which checks `maxFields` impacts on number of truncated fields in `LocalRelation`.
    
    Closes #23159 from MaxGekk/to-file-max-fields.
    
    Lead-authored-by: Maxim Gekk <ma...@gmail.com>
    Co-authored-by: Maxim Gekk <ma...@databricks.com>
    Signed-off-by: Herman van Hovell <hv...@databricks.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  4 +-
 .../sql/catalyst/analysis/CheckAnalysis.scala      |  8 ++--
 .../spark/sql/catalyst/analysis/TypeCoercion.scala |  4 +-
 .../sql/catalyst/encoders/ExpressionEncoder.scala  |  8 ++--
 .../sql/catalyst/expressions/Expression.scala      |  6 +--
 .../catalyst/expressions/codegen/javaCode.scala    |  2 +-
 .../sql/catalyst/expressions/generators.scala      |  3 +-
 .../expressions/higherOrderFunctions.scala         |  4 +-
 .../spark/sql/catalyst/expressions/misc.scala      |  5 +-
 .../catalyst/expressions/namedExpressions.scala    |  4 +-
 .../spark/sql/catalyst/plans/QueryPlan.scala       |  4 +-
 .../sql/catalyst/plans/logical/LogicalPlan.scala   |  4 +-
 .../plans/logical/basicLogicalOperators.scala      |  8 ++--
 .../apache/spark/sql/catalyst/trees/TreeNode.scala | 56 +++++++++++++---------
 .../apache/spark/sql/catalyst/util/package.scala   | 10 ++--
 .../org/apache/spark/sql/types/StructType.scala    |  6 ++-
 .../expressions/aggregate/PercentileSuite.scala    |  2 +-
 .../org/apache/spark/sql/util/UtilSuite.scala      |  2 +-
 .../spark/sql/execution/DataSourceScanExec.scala   | 12 ++---
 .../apache/spark/sql/execution/ExistingRDD.scala   |  6 +--
 .../spark/sql/execution/QueryExecution.scala       | 21 ++++----
 .../apache/spark/sql/execution/SparkPlanInfo.scala |  6 ++-
 .../sql/execution/WholeStageCodegenExec.scala      | 28 ++++++++---
 .../execution/aggregate/HashAggregateExec.scala    | 12 ++---
 .../aggregate/ObjectHashAggregateExec.scala        | 12 ++---
 .../execution/aggregate/SortAggregateExec.scala    | 12 ++---
 .../sql/execution/basicPhysicalOperators.scala     |  4 +-
 .../sql/execution/columnar/InMemoryRelation.scala  |  4 +-
 .../execution/datasources/LogicalRelation.scala    |  4 +-
 .../datasources/SaveIntoDataSourceCommand.scala    |  2 +-
 .../spark/sql/execution/datasources/ddl.scala      |  2 +-
 .../datasources/v2/DataSourceV2Relation.scala      |  8 ++--
 .../datasources/v2/DataSourceV2ScanExec.scala      |  4 +-
 .../v2/DataSourceV2StreamingScanExec.scala         |  2 +-
 .../datasources/v2/DataSourceV2StringFormat.scala  |  6 +--
 .../apache/spark/sql/execution/debug/package.scala |  3 +-
 .../org/apache/spark/sql/execution/limit.scala     |  6 +--
 .../execution/streaming/MicroBatchExecution.scala  |  6 ++-
 .../streaming/continuous/ContinuousExecution.scala |  7 +--
 .../spark/sql/execution/streaming/memory.scala     |  5 +-
 .../org/apache/spark/sql/execution/subquery.scala  |  2 +-
 .../spark/sql/execution/QueryExecutionSuite.scala  | 13 +++++
 .../execution/CreateHiveTableAsSelectCommand.scala |  2 +-
 43 files changed, 203 insertions(+), 126 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 7770531..198645d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -979,7 +979,7 @@ class Analyzer(
         a.mapExpressions(resolveExpressionTopDown(_, appendColumns))
 
       case q: LogicalPlan =>
-        logTrace(s"Attempting to resolve ${q.simpleString}")
+        logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}")
         q.mapExpressions(resolveExpressionTopDown(_, q))
     }
 
@@ -1777,7 +1777,7 @@ class Analyzer(
 
       case p if p.expressions.exists(hasGenerator) =>
         throw new AnalysisException("Generators are not supported outside the SELECT clause, but " +
-          "got: " + p.simpleString)
+          "got: " + p.simpleString(SQLConf.get.maxToStringFields))
     }
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 88d41e8..c28a978 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 /**
@@ -303,7 +304,7 @@ trait CheckAnalysis extends PredicateHelper {
             val missingAttributes = o.missingInput.mkString(",")
             val input = o.inputSet.mkString(",")
             val msgForMissingAttributes = s"Resolved attribute(s) $missingAttributes missing " +
-              s"from $input in operator ${operator.simpleString}."
+              s"from $input in operator ${operator.simpleString(SQLConf.get.maxToStringFields)}."
 
             val resolver = plan.conf.resolver
             val attrsWithSameName = o.missingInput.filter { missing =>
@@ -368,7 +369,7 @@ trait CheckAnalysis extends PredicateHelper {
               s"""nondeterministic expressions are only allowed in
                  |Project, Filter, Aggregate or Window, found:
                  | ${o.expressions.map(_.sql).mkString(",")}
-                 |in operator ${operator.simpleString}
+                 |in operator ${operator.simpleString(SQLConf.get.maxToStringFields)}
                """.stripMargin)
 
           case _: UnresolvedHint =>
@@ -380,7 +381,8 @@ trait CheckAnalysis extends PredicateHelper {
     }
     extendedCheckRules.foreach(_(plan))
     plan.foreachUp {
-      case o if !o.resolved => failAnalysis(s"unresolved operator ${o.simpleString}")
+      case o if !o.resolved =>
+        failAnalysis(s"unresolved operator ${o.simpleString(SQLConf.get.maxToStringFields)}")
       case _ =>
     }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
index 1706b3e..b19aa50 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
@@ -1069,8 +1069,8 @@ trait TypeCoercionRule extends Rule[LogicalPlan] with Logging {
             // Leave the same if the dataTypes match.
             case Some(newType) if a.dataType == newType.dataType => a
             case Some(newType) =>
-              logDebug(
-                s"Promoting $a from ${a.dataType} to ${newType.dataType} in ${q.simpleString}")
+              logDebug(s"Promoting $a from ${a.dataType} to ${newType.dataType} in " +
+                s" ${q.simpleString(SQLConf.get.maxToStringFields)}")
               newType
           }
       }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
index fbf0bd6..da5c1fd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateSafeProjection
 import org.apache.spark.sql.catalyst.expressions.objects.{AssertNotNull, InitializeJavaBean, Invoke, NewInstance}
 import org.apache.spark.sql.catalyst.optimizer.SimplifyCasts
 import org.apache.spark.sql.catalyst.plans.logical.{CatalystSerde, DeserializeToObject, LocalRelation}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{ObjectType, StringType, StructField, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
@@ -323,8 +324,8 @@ case class ExpressionEncoder[T](
     extractProjection(inputRow)
   } catch {
     case e: Exception =>
-      throw new RuntimeException(
-        s"Error while encoding: $e\n${serializer.map(_.simpleString).mkString("\n")}", e)
+      throw new RuntimeException(s"Error while encoding: $e\n" +
+          s"${serializer.map(_.simpleString(SQLConf.get.maxToStringFields)).mkString("\n")}", e)
   }
 
   /**
@@ -336,7 +337,8 @@ case class ExpressionEncoder[T](
     constructProjection(row).get(0, ObjectType(clsTag.runtimeClass)).asInstanceOf[T]
   } catch {
     case e: Exception =>
-      throw new RuntimeException(s"Error while decoding: $e\n${deserializer.simpleString}", e)
+      throw new RuntimeException(s"Error while decoding: $e\n" +
+        s"${deserializer.simpleString(SQLConf.get.maxToStringFields)}", e)
   }
 
   /**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index c89c227..d5d1195 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -259,12 +259,12 @@ abstract class Expression extends TreeNode[Expression] {
 
   // Marks this as final, Expression.verboseString should never be called, and thus shouldn't be
   // overridden by concrete classes.
-  final override def verboseString: String = simpleString
+  final override def verboseString(maxFields: Int): String = simpleString(maxFields)
 
-  override def simpleString: String = toString
+  override def simpleString(maxFields: Int): String = toString
 
   override def toString: String = prettyName + truncatedString(
-    flatArguments.toSeq, "(", ", ", ")")
+    flatArguments.toSeq, "(", ", ", ")", SQLConf.get.maxToStringFields)
 
   /**
    * Returns SQL representation of this expression.  For expressions extending [[NonSQLExpression]],
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala
index 17d4a0d..17fff64 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala
@@ -197,7 +197,7 @@ trait Block extends TreeNode[Block] with JavaCode {
     case _ => code"$this\n$other"
   }
 
-  override def verboseString: String = toString
+  override def verboseString(maxFields: Int): String = toString
 }
 
 object Block {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
index 9c74fdf..6b6da1c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.expressions.codegen.Block._
 import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 /**
@@ -101,7 +102,7 @@ case class UserDefinedGenerator(
     inputRow = new InterpretedProjection(children)
     convertToScala = {
       val inputSchema = StructType(children.map { e =>
-        StructField(e.simpleString, e.dataType, nullable = true)
+        StructField(e.simpleString(SQLConf.get.maxToStringFields), e.dataType, nullable = true)
       })
       CatalystTypeConverters.createToScalaConverter(inputSchema)
     }.asInstanceOf[InternalRow => Row]
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
index 7141b6e..e6cc11d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
@@ -76,7 +76,9 @@ case class NamedLambdaVariable(
 
   override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
 
-  override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}"
+  override def simpleString(maxFields: Int): String = {
+    s"lambda $name#${exprId.id}: ${dataType.simpleString(maxFields)}"
+  }
 }
 
 /**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
index 0cdeda9..1f1decc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.expressions.codegen.Block._
 import org.apache.spark.sql.catalyst.util.RandomUUIDGenerator
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -40,7 +41,7 @@ case class PrintToStderr(child: Expression) extends UnaryExpression {
     input
   }
 
-  private val outputPrefix = s"Result of ${child.simpleString} is "
+  private val outputPrefix = s"Result of ${child.simpleString(SQLConf.get.maxToStringFields)} is "
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
     val outputPrefixField = ctx.addReferenceObj("outputPrefix", outputPrefix)
@@ -72,7 +73,7 @@ case class AssertTrue(child: Expression) extends UnaryExpression with ImplicitCa
 
   override def prettyName: String = "assert_true"
 
-  private val errMsg = s"'${child.simpleString}' is not true!"
+  private val errMsg = s"'${child.simpleString(SQLConf.get.maxToStringFields)}' is not true!"
 
   override def eval(input: InternalRow) : Any = {
     val v = child.eval(input)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index 02b48f9..131459b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -330,7 +330,9 @@ case class AttributeReference(
 
   // Since the expression id is not in the first constructor it is missing from the default
   // tree string.
-  override def simpleString: String = s"$name#${exprId.id}: ${dataType.simpleString}"
+  override def simpleString(maxFields: Int): String = {
+    s"$name#${exprId.id}: ${dataType.simpleString(maxFields)}"
+  }
 
   override def sql: String = {
     val qualifierPrefix = if (qualifier.nonEmpty) qualifier.mkString(".") + "." else ""
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index ca0cea6..125181f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -172,9 +172,9 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
    */
   protected def statePrefix = if (missingInput.nonEmpty && children.nonEmpty) "!" else ""
 
-  override def simpleString: String = statePrefix + super.simpleString
+  override def simpleString(maxFields: Int): String = statePrefix + super.simpleString(maxFields)
 
-  override def verboseString: String = simpleString
+  override def verboseString(maxFields: Int): String = simpleString(maxFields)
 
   /**
    * All the subqueries of current plan.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 3ad2ee6..51e0f4b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -36,8 +36,8 @@ abstract class LogicalPlan
   /** Returns true if this subtree has data from a streaming data source. */
   def isStreaming: Boolean = children.exists(_.isStreaming == true)
 
-  override def verboseStringWithSuffix: String = {
-    super.verboseString + statsCache.map(", " + _.toString).getOrElse("")
+  override def verboseStringWithSuffix(maxFields: Int): String = {
+    super.verboseString(maxFields) + statsCache.map(", " + _.toString).getOrElse("")
   }
 
   /**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index a26ec4e..d8b3a4a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -468,7 +468,7 @@ case class View(
 
   override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance()))
 
-  override def simpleString: String = {
+  override def simpleString(maxFields: Int): String = {
     s"View (${desc.identifier}, ${output.mkString("[", ",", "]")})"
   }
 }
@@ -484,8 +484,8 @@ case class View(
 case class With(child: LogicalPlan, cteRelations: Seq[(String, SubqueryAlias)]) extends UnaryNode {
   override def output: Seq[Attribute] = child.output
 
-  override def simpleString: String = {
-    val cteAliases = truncatedString(cteRelations.map(_._1), "[", ", ", "]")
+  override def simpleString(maxFields: Int): String = {
+    val cteAliases = truncatedString(cteRelations.map(_._1), "[", ", ", "]", maxFields)
     s"CTE $cteAliases"
   }
 
@@ -557,7 +557,7 @@ case class Range(
 
   override def newInstance(): Range = copy(output = output.map(_.newInstance()))
 
-  override def simpleString: String = {
+  override def simpleString(maxFields: Int): String = {
     s"Range ($start, $end, step=$step, splits=$numSlices)"
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 2e9f9f5..21e59bb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
 import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.storage.StorageLevel
 
@@ -433,17 +434,17 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
   private lazy val allChildren: Set[TreeNode[_]] = (children ++ innerChildren).toSet[TreeNode[_]]
 
   /** Returns a string representing the arguments to this node, minus any children */
-  def argString: String = stringArgs.flatMap {
+  def argString(maxFields: Int): String = stringArgs.flatMap {
     case tn: TreeNode[_] if allChildren.contains(tn) => Nil
     case Some(tn: TreeNode[_]) if allChildren.contains(tn) => Nil
-    case Some(tn: TreeNode[_]) => tn.simpleString :: Nil
-    case tn: TreeNode[_] => tn.simpleString :: Nil
+    case Some(tn: TreeNode[_]) => tn.simpleString(maxFields) :: Nil
+    case tn: TreeNode[_] => tn.simpleString(maxFields) :: Nil
     case seq: Seq[Any] if seq.toSet.subsetOf(allChildren.asInstanceOf[Set[Any]]) => Nil
     case iter: Iterable[_] if iter.isEmpty => Nil
-    case seq: Seq[_] => truncatedString(seq, "[", ", ", "]") :: Nil
-    case set: Set[_] => truncatedString(set.toSeq, "{", ", ", "}") :: Nil
+    case seq: Seq[_] => truncatedString(seq, "[", ", ", "]", maxFields) :: Nil
+    case set: Set[_] => truncatedString(set.toSeq, "{", ", ", "}", maxFields) :: Nil
     case array: Array[_] if array.isEmpty => Nil
-    case array: Array[_] => truncatedString(array, "[", ", ", "]") :: Nil
+    case array: Array[_] => truncatedString(array, "[", ", ", "]", maxFields) :: Nil
     case null => Nil
     case None => Nil
     case Some(null) => Nil
@@ -456,24 +457,33 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
     case other => other :: Nil
   }.mkString(", ")
 
-  /** ONE line description of this node. */
-  def simpleString: String = s"$nodeName $argString".trim
+  /**
+   * ONE line description of this node.
+   * @param maxFields Maximum number of fields that will be converted to strings.
+   *                  Any elements beyond the limit will be dropped.
+   */
+  def simpleString(maxFields: Int): String = {
+    s"$nodeName ${argString(maxFields)}".trim
+  }
 
   /** ONE line description of this node with more information */
-  def verboseString: String
+  def verboseString(maxFields: Int): String
 
   /** ONE line description of this node with some suffix information */
-  def verboseStringWithSuffix: String = verboseString
+  def verboseStringWithSuffix(maxFields: Int): String = verboseString(maxFields)
 
   override def toString: String = treeString
 
   /** Returns a string representation of the nodes in this tree */
   def treeString: String = treeString(verbose = true)
 
-  def treeString(verbose: Boolean, addSuffix: Boolean = false): String = {
+  def treeString(
+      verbose: Boolean,
+      addSuffix: Boolean = false,
+      maxFields: Int = SQLConf.get.maxToStringFields): String = {
     val writer = new StringBuilderWriter()
     try {
-      treeString(writer, verbose, addSuffix)
+      treeString(writer, verbose, addSuffix, maxFields)
       writer.toString
     } finally {
       writer.close()
@@ -483,8 +493,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
   def treeString(
       writer: Writer,
       verbose: Boolean,
-      addSuffix: Boolean): Unit = {
-    generateTreeString(0, Nil, writer, verbose, "", addSuffix)
+      addSuffix: Boolean,
+      maxFields: Int): Unit = {
+    generateTreeString(0, Nil, writer, verbose, "", addSuffix, maxFields)
   }
 
   /**
@@ -550,7 +561,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
       writer: Writer,
       verbose: Boolean,
       prefix: String = "",
-      addSuffix: Boolean = false): Unit = {
+      addSuffix: Boolean = false,
+      maxFields: Int): Unit = {
 
     if (depth > 0) {
       lastChildren.init.foreach { isLast =>
@@ -560,9 +572,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
     }
 
     val str = if (verbose) {
-      if (addSuffix) verboseStringWithSuffix else verboseString
+      if (addSuffix) verboseStringWithSuffix(maxFields) else verboseString(maxFields)
     } else {
-      simpleString
+      simpleString(maxFields)
     }
     writer.write(prefix)
     writer.write(str)
@@ -571,17 +583,17 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
     if (innerChildren.nonEmpty) {
       innerChildren.init.foreach(_.generateTreeString(
         depth + 2, lastChildren :+ children.isEmpty :+ false, writer, verbose,
-        addSuffix = addSuffix))
+        addSuffix = addSuffix, maxFields = maxFields))
       innerChildren.last.generateTreeString(
         depth + 2, lastChildren :+ children.isEmpty :+ true, writer, verbose,
-        addSuffix = addSuffix)
+        addSuffix = addSuffix, maxFields = maxFields)
     }
 
     if (children.nonEmpty) {
       children.init.foreach(_.generateTreeString(
-        depth + 1, lastChildren :+ false, writer, verbose, prefix, addSuffix))
+        depth + 1, lastChildren :+ false, writer, verbose, prefix, addSuffix, maxFields))
       children.last.generateTreeString(
-        depth + 1, lastChildren :+ true, writer, verbose, prefix, addSuffix)
+        depth + 1, lastChildren :+ true, writer, verbose, prefix, addSuffix, maxFields)
     }
   }
 
@@ -664,7 +676,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
       t.forall(_.isInstanceOf[Partitioning]) || t.forall(_.isInstanceOf[DataType]) =>
       JArray(t.map(parseToJson).toList)
     case t: Seq[_] if t.length > 0 && t.head.isInstanceOf[String] =>
-      JString(truncatedString(t, "[", ", ", "]"))
+      JString(truncatedString(t, "[", ", ", "]", SQLConf.get.maxToStringFields))
     case t: Seq[_] => JNull
     case m: Map[_, _] => JNull
     // if it's a scala object, we can simply keep the full class path.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
index 277584b..7f5860e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
@@ -184,14 +184,14 @@ package object util extends Logging {
       start: String,
       sep: String,
       end: String,
-      maxNumFields: Int = SQLConf.get.maxToStringFields): String = {
-    if (seq.length > maxNumFields) {
+      maxFields: Int): String = {
+    if (seq.length > maxFields) {
       if (truncationWarningPrinted.compareAndSet(false, true)) {
         logWarning(
           "Truncated the string representation of a plan since it was too large. This " +
             s"behavior can be adjusted by setting '${SQLConf.MAX_TO_STRING_FIELDS.key}'.")
       }
-      val numFields = math.max(0, maxNumFields - 1)
+      val numFields = math.max(0, maxFields - 1)
       seq.take(numFields).mkString(
         start, sep, sep + "... " + (seq.length - numFields) + " more fields" + end)
     } else {
@@ -200,7 +200,9 @@ package object util extends Logging {
   }
 
   /** Shorthand for calling truncatedString() without start or end strings. */
-  def truncatedString[T](seq: Seq[T], sep: String): String = truncatedString(seq, "", sep, "")
+  def truncatedString[T](seq: Seq[T], sep: String, maxFields: Int): String = {
+    truncatedString(seq, "", sep, "", maxFields)
+  }
 
   /* FIX ME
   implicit class debugLogging(a: Any) {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index e01d7c5..d563276 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -28,6 +28,7 @@ import org.apache.spark.annotation.Stable
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, InterpretedOrdering}
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, LegacyTypeStringParser}
 import org.apache.spark.sql.catalyst.util.{quoteIdentifier, truncatedString}
+import org.apache.spark.sql.internal.SQLConf
 
 /**
  * A [[StructType]] object can be constructed by
@@ -343,7 +344,10 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
 
   override def simpleString: String = {
     val fieldTypes = fields.view.map(field => s"${field.name}:${field.dataType.simpleString}")
-    truncatedString(fieldTypes, "struct<", ",", ">")
+    truncatedString(
+      fieldTypes,
+      "struct<", ",", ">",
+      SQLConf.get.maxToStringFields)
   }
 
   override def catalogString: String = {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala
index 63c7b42..0e0c8e1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala
@@ -215,7 +215,7 @@ class PercentileSuite extends SparkFunSuite {
       val percentile2 = new Percentile(child, percentage)
       assertEqual(percentile2.checkInputDataTypes(),
         TypeCheckFailure(s"Percentage(s) must be between 0.0 and 1.0, " +
-        s"but got ${percentage.simpleString}"))
+        s"but got ${percentage.simpleString(100)}"))
     }
 
     val nonFoldablePercentage = Seq(NonFoldableLiteral(0.5),
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/UtilSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/UtilSuite.scala
index 9c16202..d95de71 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/UtilSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/UtilSuite.scala
@@ -26,6 +26,6 @@ class UtilSuite extends SparkFunSuite {
     assert(truncatedString(Seq(1, 2), "[", ", ", "]", 2) == "[1, 2]")
     assert(truncatedString(Seq(1, 2, 3), "[", ", ", "]", 2) == "[1, ... 2 more fields]")
     assert(truncatedString(Seq(1, 2, 3), "[", ", ", "]", -5) == "[, ... 3 more fields]")
-    assert(truncatedString(Seq(1, 2, 3), ", ") == "1, 2, 3")
+    assert(truncatedString(Seq(1, 2, 3), ", ", 10) == "1, 2, 3")
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 322ffff..1d7dd73 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -52,19 +52,19 @@ trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
   // Metadata that describes more details of this scan.
   protected def metadata: Map[String, String]
 
-  override def simpleString: String = {
+  override def simpleString(maxFields: Int): String = {
     val metadataEntries = metadata.toSeq.sorted.map {
       case (key, value) =>
         key + ": " + StringUtils.abbreviate(redact(value), 100)
     }
-    val metadataStr = truncatedString(metadataEntries, " ", ", ", "")
-    s"$nodeNamePrefix$nodeName${truncatedString(output, "[", ",", "]")}$metadataStr"
+    val metadataStr = truncatedString(metadataEntries, " ", ", ", "", maxFields)
+    s"$nodeNamePrefix$nodeName${truncatedString(output, "[", ",", "]", maxFields)}$metadataStr"
   }
 
-  override def verboseString: String = redact(super.verboseString)
+  override def verboseString(maxFields: Int): String = redact(super.verboseString(maxFields))
 
-  override def treeString(verbose: Boolean, addSuffix: Boolean): String = {
-    redact(super.treeString(verbose, addSuffix))
+  override def treeString(verbose: Boolean, addSuffix: Boolean, maxFields: Int): String = {
+    redact(super.treeString(verbose, addSuffix, maxFields))
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 49fb288..981ecae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -79,7 +79,7 @@ case class ExternalRDDScanExec[T](
     }
   }
 
-  override def simpleString: String = {
+  override def simpleString(maxFields: Int): String = {
     s"$nodeName${output.mkString("[", ",", "]")}"
   }
 }
@@ -156,8 +156,8 @@ case class RDDScanExec(
     }
   }
 
-  override def simpleString: String = {
-    s"$nodeName${truncatedString(output, "[", ",", "]")}"
+  override def simpleString(maxFields: Int): String = {
+    s"$nodeName${truncatedString(output, "[", ",", "]", maxFields)}"
   }
 
   // Input can be InternalRow, has to be turned into UnsafeRows.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index eef5a3f..9b8d2e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand}
 import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampType, _}
 import org.apache.spark.util.Utils
 
@@ -208,27 +209,27 @@ class QueryExecution(
     }
   }
 
-  private def writePlans(writer: Writer): Unit = {
+  private def writePlans(writer: Writer, maxFields: Int): Unit = {
     val (verbose, addSuffix) = (true, false)
 
     writer.write("== Parsed Logical Plan ==\n")
-    writeOrError(writer)(logical.treeString(_, verbose, addSuffix))
+    writeOrError(writer)(logical.treeString(_, verbose, addSuffix, maxFields))
     writer.write("\n== Analyzed Logical Plan ==\n")
     val analyzedOutput = stringOrError(truncatedString(
-      analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", "))
+      analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ", maxFields))
     writer.write(analyzedOutput)
     writer.write("\n")
-    writeOrError(writer)(analyzed.treeString(_, verbose, addSuffix))
+    writeOrError(writer)(analyzed.treeString(_, verbose, addSuffix, maxFields))
     writer.write("\n== Optimized Logical Plan ==\n")
-    writeOrError(writer)(optimizedPlan.treeString(_, verbose, addSuffix))
+    writeOrError(writer)(optimizedPlan.treeString(_, verbose, addSuffix, maxFields))
     writer.write("\n== Physical Plan ==\n")
-    writeOrError(writer)(executedPlan.treeString(_, verbose, addSuffix))
+    writeOrError(writer)(executedPlan.treeString(_, verbose, addSuffix, maxFields))
   }
 
   override def toString: String = withRedaction {
     val writer = new StringBuilderWriter()
     try {
-      writePlans(writer)
+      writePlans(writer, SQLConf.get.maxToStringFields)
       writer.toString
     } finally {
       writer.close()
@@ -280,14 +281,16 @@ class QueryExecution(
 
     /**
      * Dumps debug information about query execution into the specified file.
+     *
+     * @param maxFields maximim number of fields converted to string representation.
      */
-    def toFile(path: String): Unit = {
+    def toFile(path: String, maxFields: Int = Int.MaxValue): Unit = {
       val filePath = new Path(path)
       val fs = filePath.getFileSystem(sparkSession.sessionState.newHadoopConf())
       val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath)))
 
       try {
-        writePlans(writer)
+        writePlans(writer, maxFields)
         writer.write("\n== Whole Stage Codegen ==\n")
         org.apache.spark.sql.execution.debug.writeCodegen(writer, executedPlan)
       } finally {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
index 59ffd16..f554ff0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
 import org.apache.spark.sql.execution.metric.SQLMetricInfo
+import org.apache.spark.sql.internal.SQLConf
 
 /**
  * :: DeveloperApi ::
@@ -62,7 +63,10 @@ private[execution] object SparkPlanInfo {
       case fileScan: FileSourceScanExec => fileScan.metadata
       case _ => Map[String, String]()
     }
-    new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan),
+    new SparkPlanInfo(
+      plan.nodeName,
+      plan.simpleString(SQLConf.get.maxToStringFields),
+      children.map(fromSparkPlan),
       metadata, metrics)
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index fbda0d8..f4927de 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -87,7 +87,7 @@ trait CodegenSupport extends SparkPlan {
     this.parent = parent
     ctx.freshNamePrefix = variablePrefix
     s"""
-       |${ctx.registerComment(s"PRODUCE: ${this.simpleString}")}
+       |${ctx.registerComment(s"PRODUCE: ${this.simpleString(SQLConf.get.maxToStringFields)}")}
        |${doProduce(ctx)}
      """.stripMargin
   }
@@ -188,7 +188,7 @@ trait CodegenSupport extends SparkPlan {
       parent.doConsume(ctx, inputVars, rowVar)
     }
     s"""
-       |${ctx.registerComment(s"CONSUME: ${parent.simpleString}")}
+       |${ctx.registerComment(s"CONSUME: ${parent.simpleString(SQLConf.get.maxToStringFields)}")}
        |$evaluated
        |$consumeFunc
      """.stripMargin
@@ -496,8 +496,16 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod
       writer: Writer,
       verbose: Boolean,
       prefix: String = "",
-      addSuffix: Boolean = false): Unit = {
-    child.generateTreeString(depth, lastChildren, writer, verbose, prefix = "", addSuffix = false)
+      addSuffix: Boolean = false,
+      maxFields: Int): Unit = {
+    child.generateTreeString(
+      depth,
+      lastChildren,
+      writer,
+      verbose,
+      prefix = "",
+      addSuffix = false,
+      maxFields)
   }
 
   override def needCopyResult: Boolean = false
@@ -772,8 +780,16 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
       writer: Writer,
       verbose: Boolean,
       prefix: String = "",
-      addSuffix: Boolean = false): Unit = {
-    child.generateTreeString(depth, lastChildren, writer, verbose, s"*($codegenStageId) ", false)
+      addSuffix: Boolean = false,
+      maxFields: Int): Unit = {
+    child.generateTreeString(
+      depth,
+      lastChildren,
+      writer,
+      verbose,
+      s"*($codegenStageId) ",
+      false,
+      maxFields)
   }
 
   override def needStopCheck: Boolean = true
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 4827f83..2355d30 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -922,18 +922,18 @@ case class HashAggregateExec(
      """
   }
 
-  override def verboseString: String = toString(verbose = true)
+  override def verboseString(maxFields: Int): String = toString(verbose = true, maxFields)
 
-  override def simpleString: String = toString(verbose = false)
+  override def simpleString(maxFields: Int): String = toString(verbose = false, maxFields)
 
-  private def toString(verbose: Boolean): String = {
+  private def toString(verbose: Boolean, maxFields: Int): String = {
     val allAggregateExpressions = aggregateExpressions
 
     testFallbackStartsAt match {
       case None =>
-        val keyString = truncatedString(groupingExpressions, "[", ", ", "]")
-        val functionString = truncatedString(allAggregateExpressions, "[", ", ", "]")
-        val outputString = truncatedString(output, "[", ", ", "]")
+        val keyString = truncatedString(groupingExpressions, "[", ", ", "]", maxFields)
+        val functionString = truncatedString(allAggregateExpressions, "[", ", ", "]", maxFields)
+        val outputString = truncatedString(output, "[", ", ", "]", maxFields)
         if (verbose) {
           s"HashAggregate(keys=$keyString, functions=$functionString, output=$outputString)"
         } else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
index 7145bb0..bd52c63 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
@@ -137,15 +137,15 @@ case class ObjectHashAggregateExec(
     }
   }
 
-  override def verboseString: String = toString(verbose = true)
+  override def verboseString(maxFields: Int): String = toString(verbose = true, maxFields)
 
-  override def simpleString: String = toString(verbose = false)
+  override def simpleString(maxFields: Int): String = toString(verbose = false, maxFields)
 
-  private def toString(verbose: Boolean): String = {
+  private def toString(verbose: Boolean, maxFields: Int): String = {
     val allAggregateExpressions = aggregateExpressions
-    val keyString = truncatedString(groupingExpressions, "[", ", ", "]")
-    val functionString = truncatedString(allAggregateExpressions, "[", ", ", "]")
-    val outputString = truncatedString(output, "[", ", ", "]")
+    val keyString = truncatedString(groupingExpressions, "[", ", ", "]", maxFields)
+    val functionString = truncatedString(allAggregateExpressions, "[", ", ", "]", maxFields)
+    val outputString = truncatedString(output, "[", ", ", "]", maxFields)
     if (verbose) {
       s"ObjectHashAggregate(keys=$keyString, functions=$functionString, output=$outputString)"
     } else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
index d732b90..7ab6ecc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
@@ -107,16 +107,16 @@ case class SortAggregateExec(
     }
   }
 
-  override def simpleString: String = toString(verbose = false)
+  override def simpleString(maxFields: Int): String = toString(verbose = false, maxFields)
 
-  override def verboseString: String = toString(verbose = true)
+  override def verboseString(maxFields: Int): String = toString(verbose = true, maxFields)
 
-  private def toString(verbose: Boolean): String = {
+  private def toString(verbose: Boolean, maxFields: Int): String = {
     val allAggregateExpressions = aggregateExpressions
 
-    val keyString = truncatedString(groupingExpressions, "[", ", ", "]")
-    val functionString = truncatedString(allAggregateExpressions, "[", ", ", "]")
-    val outputString = truncatedString(output, "[", ", ", "]")
+    val keyString = truncatedString(groupingExpressions, "[", ", ", "]", maxFields)
+    val functionString = truncatedString(allAggregateExpressions, "[", ", ", "]", maxFields)
+    val outputString = truncatedString(output, "[", ", ", "]", maxFields)
     if (verbose) {
       s"SortAggregate(key=$keyString, functions=$functionString, output=$outputString)"
     } else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 09effe0..2570b36 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -586,7 +586,9 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
       }
   }
 
-  override def simpleString: String = s"Range ($start, $end, step=$step, splits=$numSlices)"
+  override def simpleString(maxFields: Int): String = {
+    s"Range ($start, $end, step=$step, splits=$numSlices)"
+  }
 }
 
 /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 73eb65f..4109d99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -209,6 +209,6 @@ case class InMemoryRelation(
 
   override protected def otherCopyArgs: Seq[AnyRef] = Seq(statsOfPlanToCache)
 
-  override def simpleString: String =
-    s"InMemoryRelation [${truncatedString(output, ", ")}], ${cacheBuilder.storageLevel}"
+  override def simpleString(maxFields: Int): String =
+    s"InMemoryRelation [${truncatedString(output, ", ", maxFields)}], ${cacheBuilder.storageLevel}"
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 1023572..db3604f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -63,7 +63,9 @@ case class LogicalRelation(
     case _ =>  // Do nothing.
   }
 
-  override def simpleString: String = s"Relation[${truncatedString(output, ",")}] $relation"
+  override def simpleString(maxFields: Int): String = {
+    s"Relation[${truncatedString(output, ",", maxFields)}] $relation"
+  }
 }
 
 object LogicalRelation {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala
index 00b1b5d..f29e786 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala
@@ -48,7 +48,7 @@ case class SaveIntoDataSourceCommand(
     Seq.empty[Row]
   }
 
-  override def simpleString: String = {
+  override def simpleString(maxFields: Int): String = {
     val redacted = SQLConf.get.redactOptions(options)
     s"SaveIntoDataSourceCommand ${dataSource}, ${redacted}, ${mode}"
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index fdc5e85..042320e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -68,7 +68,7 @@ case class CreateTempViewUsing(
       s"Temporary view '$tableIdent' should not have specified a database")
   }
 
-  override def argString: String = {
+  override def argString(maxFields: Int): String = {
     s"[tableIdent:$tableIdent " +
       userSpecifiedSchema.map(_ + " ").getOrElse("") +
       s"replace:$replace " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 0a6b0af..7bf2b8b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -52,8 +52,8 @@ case class DataSourceV2Relation(
 
   override def name: String = table.name()
 
-  override def simpleString: String = {
-    s"RelationV2${truncatedString(output, "[", ", ", "]")} $name"
+  override def simpleString(maxFields: Int): String = {
+    s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name"
   }
 
   def newWriteSupport(): BatchWriteSupport = source.createWriteSupport(options, schema)
@@ -96,7 +96,9 @@ case class StreamingDataSourceV2Relation(
 
   override def isStreaming: Boolean = true
 
-  override def simpleString: String = "Streaming RelationV2 " + metadataString
+  override def simpleString(maxFields: Int): String = {
+    "Streaming RelationV2 " + metadataString(maxFields)
+  }
 
   override def pushedFilters: Seq[Expression] = Nil
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
index 725bcc3..53e4e77 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
@@ -35,8 +35,8 @@ case class DataSourceV2ScanExec(
     @transient batch: Batch)
   extends LeafExecNode with ColumnarBatchScan {
 
-  override def simpleString: String = {
-    s"ScanV2${truncatedString(output, "[", ", ", "]")} $scanDesc"
+  override def simpleString(maxFields: Int): String = {
+    s"ScanV2${truncatedString(output, "[", ", ", "]", maxFields)} $scanDesc"
   }
 
   // TODO: unify the equal/hashCode implementation for all data source v2 query plans.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StreamingScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StreamingScanExec.scala
index c872940..be75fe4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StreamingScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StreamingScanExec.scala
@@ -42,7 +42,7 @@ case class DataSourceV2StreamingScanExec(
     @transient scanConfig: ScanConfig)
   extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan {
 
-  override def simpleString: String = "ScanV2 " + metadataString
+  override def simpleString(maxFields: Int): String = "ScanV2 " + metadataString(maxFields)
 
   // TODO: unify the equal/hashCode implementation for all data source v2 query plans.
   override def equals(other: Any): Boolean = other match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala
index e829f62..f11703c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala
@@ -59,7 +59,7 @@ trait DataSourceV2StringFormat {
     case _ => Utils.getSimpleName(source.getClass)
   }
 
-  def metadataString: String = {
+  def metadataString(maxFields: Int): String = {
     val entries = scala.collection.mutable.ArrayBuffer.empty[(String, String)]
 
     if (pushedFilters.nonEmpty) {
@@ -73,12 +73,12 @@ trait DataSourceV2StringFormat {
       }.mkString("[", ",", "]")
     }
 
-    val outputStr = truncatedString(output, "[", ", ", "]")
+    val outputStr = truncatedString(output, "[", ", ", "]", maxFields)
 
     val entriesStr = if (entries.nonEmpty) {
       truncatedString(entries.map {
         case (key, value) => key + ": " + StringUtils.abbreviate(value, 100)
-      }, " (", ", ", ")")
+      }, " (", ", ", ")", maxFields)
     } else {
       ""
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index 3511cef..ae8197f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, Codegen
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.catalyst.trees.TreeNodeRef
 import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.StreamingQuery
 import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
 
@@ -216,7 +217,7 @@ package object debug {
     val columnStats: Array[ColumnMetrics] = Array.fill(child.output.size)(new ColumnMetrics())
 
     def dumpStats(): Unit = {
-      debugPrint(s"== ${child.simpleString} ==")
+      debugPrint(s"== ${child.simpleString(SQLConf.get.maxToStringFields)} ==")
       debugPrint(s"Tuples output: ${tupleCount.value}")
       child.output.zip(columnStats).foreach { case (attr, metric) =>
         // This is called on driver. All accumulator updates have a fixed value. So it's safe to use
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index bfaf080..56973af 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -198,9 +198,9 @@ case class TakeOrderedAndProjectExec(
 
   override def outputPartitioning: Partitioning = SinglePartition
 
-  override def simpleString: String = {
-    val orderByString = truncatedString(sortOrder, "[", ",", "]")
-    val outputString = truncatedString(output, "[", ",", "]")
+  override def simpleString(maxFields: Int): String = {
+    val orderByString = truncatedString(sortOrder, "[", ",", "]", maxFields)
+    val outputString = truncatedString(output, "[", ",", "]", maxFields)
 
     s"TakeOrderedAndProject(limit=$limit, orderBy=$orderByString, output=$outputString)"
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 8ad436a..38ecb0d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2, WriteToDataSourceV2Exec}
 import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWritSupport, RateControlMicroBatchReadSupport}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset => OffsetV2}
 import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
@@ -482,9 +483,10 @@ class MicroBatchExecution(
     val newBatchesPlan = logicalPlan transform {
       case StreamingExecutionRelation(source, output) =>
         newData.get(source).map { dataPlan =>
+          val maxFields = SQLConf.get.maxToStringFields
           assert(output.size == dataPlan.output.size,
-            s"Invalid batch: ${truncatedString(output, ",")} != " +
-              s"${truncatedString(dataPlan.output, ",")}")
+            s"Invalid batch: ${truncatedString(output, ",", maxFields)} != " +
+              s"${truncatedString(dataPlan.output, ",", maxFields)}")
 
           val aliases = output.zip(dataPlan.output).map { case (to, from) =>
             Alias(from, to.name)(exprId = to.exprId, explicitMetadata = Some(from.metadata))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index f0859aa..89033b7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2StreamingScanExec, StreamingDataSourceV2Relation}
 import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.v2
 import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceOptions, StreamingWriteSupportProvider}
 import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, PartitionOffset}
@@ -166,10 +167,10 @@ class ContinuousExecution(
         val readSupport = continuousSources(insertedSourceId)
         insertedSourceId += 1
         val newOutput = readSupport.fullSchema().toAttributes
-
+        val maxFields = SQLConf.get.maxToStringFields
         assert(output.size == newOutput.size,
-          s"Invalid reader: ${truncatedString(output, ",")} != " +
-            s"${truncatedString(newOutput, ",")}")
+          s"Invalid reader: ${truncatedString(output, ",", maxFields)} != " +
+            s"${truncatedString(newOutput, ",", maxFields)}")
         replacements ++= output.zip(newOutput)
 
         val loggedOffset = offsets.offsets(0)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index daee089..13b75ae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Stati
 import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
 import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset => OffsetV2}
 import org.apache.spark.sql.streaming.OutputMode
@@ -117,7 +118,9 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
     }
   }
 
-  override def toString: String = s"MemoryStream[${truncatedString(output, ",")}]"
+  override def toString: String = {
+    s"MemoryStream[${truncatedString(output, ",", SQLConf.get.maxToStringFields)}]"
+  }
 
   override def deserializeOffset(json: String): OffsetV2 = LongOffset(json.toLong)
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
index 310ebcd..e180d22 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
@@ -51,7 +51,7 @@ case class ScalarSubquery(
   override def dataType: DataType = plan.schema.fields.head.dataType
   override def children: Seq[Expression] = Nil
   override def nullable: Boolean = true
-  override def toString: String = plan.simpleString
+  override def toString: String = plan.simpleString(SQLConf.get.maxToStringFields)
   override def withNewPlan(query: SubqueryExec): ScalarSubquery = copy(plan = query)
 
   override def semanticEquals(other: Expression): Boolean = other match {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
index 0c47a20..3cc97c9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
@@ -106,6 +106,19 @@ class QueryExecutionSuite extends SharedSQLContext {
     }
   }
 
+  test("check maximum fields restriction") {
+    withTempDir { dir =>
+      val path = dir.getCanonicalPath + "/plans.txt"
+      val ds = spark.createDataset(Seq(QueryExecutionTestRecord(
+        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
+        16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26)))
+      ds.queryExecution.debug.toFile(path)
+      val localRelations = Source.fromFile(path).getLines().filter(_.contains("LocalRelation"))
+
+      assert(!localRelations.exists(_.contains("more fields")))
+    }
+  }
+
   test("toString() exception/error handling") {
     spark.experimental.extraStrategies = Seq(
         new SparkStrategy {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index 608f21e..7249eac 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -83,7 +83,7 @@ trait CreateHiveTableAsSelectBase extends DataWritingCommand {
     tableDesc: CatalogTable,
     tableExists: Boolean): DataWritingCommand
 
-  override def argString: String = {
+  override def argString(maxFields: Int): String = {
     s"[Database:${tableDesc.database}, " +
     s"TableName: ${tableDesc.identifier.table}, " +
     s"InsertIntoHiveTable]"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org