You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2018/12/27 10:13:40 UTC
[spark] branch master updated: [SPARK-26191][SQL] Control
truncation of Spark plans via maxFields parameter
This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a1c1dd3 [SPARK-26191][SQL] Control truncation of Spark plans via maxFields parameter
a1c1dd3 is described below
commit a1c1dd3484a4dcd7c38fe256e69dbaaaf10d1a92
Author: Maxim Gekk <ma...@gmail.com>
AuthorDate: Thu Dec 27 11:13:16 2018 +0100
[SPARK-26191][SQL] Control truncation of Spark plans via maxFields parameter
## What changes were proposed in this pull request?
In the PR, I propose to add `maxFields` parameter to all functions involved in creation of textual representation of spark plans such as `simpleString` and `verboseString`. New parameter restricts number of fields converted to truncated strings. Any elements beyond the limit will be dropped and replaced by a `"... N more fields"` placeholder. The threshold is bumped up to `Int.MaxValue` for `toFile()`.
## How was this patch tested?
Added a test to `QueryExecutionSuite` which checks `maxFields` impacts on number of truncated fields in `LocalRelation`.
Closes #23159 from MaxGekk/to-file-max-fields.
Lead-authored-by: Maxim Gekk <ma...@gmail.com>
Co-authored-by: Maxim Gekk <ma...@databricks.com>
Signed-off-by: Herman van Hovell <hv...@databricks.com>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 4 +-
.../sql/catalyst/analysis/CheckAnalysis.scala | 8 ++--
.../spark/sql/catalyst/analysis/TypeCoercion.scala | 4 +-
.../sql/catalyst/encoders/ExpressionEncoder.scala | 8 ++--
.../sql/catalyst/expressions/Expression.scala | 6 +--
.../catalyst/expressions/codegen/javaCode.scala | 2 +-
.../sql/catalyst/expressions/generators.scala | 3 +-
.../expressions/higherOrderFunctions.scala | 4 +-
.../spark/sql/catalyst/expressions/misc.scala | 5 +-
.../catalyst/expressions/namedExpressions.scala | 4 +-
.../spark/sql/catalyst/plans/QueryPlan.scala | 4 +-
.../sql/catalyst/plans/logical/LogicalPlan.scala | 4 +-
.../plans/logical/basicLogicalOperators.scala | 8 ++--
.../apache/spark/sql/catalyst/trees/TreeNode.scala | 56 +++++++++++++---------
.../apache/spark/sql/catalyst/util/package.scala | 10 ++--
.../org/apache/spark/sql/types/StructType.scala | 6 ++-
.../expressions/aggregate/PercentileSuite.scala | 2 +-
.../org/apache/spark/sql/util/UtilSuite.scala | 2 +-
.../spark/sql/execution/DataSourceScanExec.scala | 12 ++---
.../apache/spark/sql/execution/ExistingRDD.scala | 6 +--
.../spark/sql/execution/QueryExecution.scala | 21 ++++----
.../apache/spark/sql/execution/SparkPlanInfo.scala | 6 ++-
.../sql/execution/WholeStageCodegenExec.scala | 28 ++++++++---
.../execution/aggregate/HashAggregateExec.scala | 12 ++---
.../aggregate/ObjectHashAggregateExec.scala | 12 ++---
.../execution/aggregate/SortAggregateExec.scala | 12 ++---
.../sql/execution/basicPhysicalOperators.scala | 4 +-
.../sql/execution/columnar/InMemoryRelation.scala | 4 +-
.../execution/datasources/LogicalRelation.scala | 4 +-
.../datasources/SaveIntoDataSourceCommand.scala | 2 +-
.../spark/sql/execution/datasources/ddl.scala | 2 +-
.../datasources/v2/DataSourceV2Relation.scala | 8 ++--
.../datasources/v2/DataSourceV2ScanExec.scala | 4 +-
.../v2/DataSourceV2StreamingScanExec.scala | 2 +-
.../datasources/v2/DataSourceV2StringFormat.scala | 6 +--
.../apache/spark/sql/execution/debug/package.scala | 3 +-
.../org/apache/spark/sql/execution/limit.scala | 6 +--
.../execution/streaming/MicroBatchExecution.scala | 6 ++-
.../streaming/continuous/ContinuousExecution.scala | 7 +--
.../spark/sql/execution/streaming/memory.scala | 5 +-
.../org/apache/spark/sql/execution/subquery.scala | 2 +-
.../spark/sql/execution/QueryExecutionSuite.scala | 13 +++++
.../execution/CreateHiveTableAsSelectCommand.scala | 2 +-
43 files changed, 203 insertions(+), 126 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 7770531..198645d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -979,7 +979,7 @@ class Analyzer(
a.mapExpressions(resolveExpressionTopDown(_, appendColumns))
case q: LogicalPlan =>
- logTrace(s"Attempting to resolve ${q.simpleString}")
+ logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}")
q.mapExpressions(resolveExpressionTopDown(_, q))
}
@@ -1777,7 +1777,7 @@ class Analyzer(
case p if p.expressions.exists(hasGenerator) =>
throw new AnalysisException("Generators are not supported outside the SELECT clause, but " +
- "got: " + p.simpleString)
+ "got: " + p.simpleString(SQLConf.get.maxToStringFields))
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 88d41e8..c28a978 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
/**
@@ -303,7 +304,7 @@ trait CheckAnalysis extends PredicateHelper {
val missingAttributes = o.missingInput.mkString(",")
val input = o.inputSet.mkString(",")
val msgForMissingAttributes = s"Resolved attribute(s) $missingAttributes missing " +
- s"from $input in operator ${operator.simpleString}."
+ s"from $input in operator ${operator.simpleString(SQLConf.get.maxToStringFields)}."
val resolver = plan.conf.resolver
val attrsWithSameName = o.missingInput.filter { missing =>
@@ -368,7 +369,7 @@ trait CheckAnalysis extends PredicateHelper {
s"""nondeterministic expressions are only allowed in
|Project, Filter, Aggregate or Window, found:
| ${o.expressions.map(_.sql).mkString(",")}
- |in operator ${operator.simpleString}
+ |in operator ${operator.simpleString(SQLConf.get.maxToStringFields)}
""".stripMargin)
case _: UnresolvedHint =>
@@ -380,7 +381,8 @@ trait CheckAnalysis extends PredicateHelper {
}
extendedCheckRules.foreach(_(plan))
plan.foreachUp {
- case o if !o.resolved => failAnalysis(s"unresolved operator ${o.simpleString}")
+ case o if !o.resolved =>
+ failAnalysis(s"unresolved operator ${o.simpleString(SQLConf.get.maxToStringFields)}")
case _ =>
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
index 1706b3e..b19aa50 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
@@ -1069,8 +1069,8 @@ trait TypeCoercionRule extends Rule[LogicalPlan] with Logging {
// Leave the same if the dataTypes match.
case Some(newType) if a.dataType == newType.dataType => a
case Some(newType) =>
- logDebug(
- s"Promoting $a from ${a.dataType} to ${newType.dataType} in ${q.simpleString}")
+ logDebug(s"Promoting $a from ${a.dataType} to ${newType.dataType} in " +
+ s" ${q.simpleString(SQLConf.get.maxToStringFields)}")
newType
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
index fbf0bd6..da5c1fd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateSafeProjection
import org.apache.spark.sql.catalyst.expressions.objects.{AssertNotNull, InitializeJavaBean, Invoke, NewInstance}
import org.apache.spark.sql.catalyst.optimizer.SimplifyCasts
import org.apache.spark.sql.catalyst.plans.logical.{CatalystSerde, DeserializeToObject, LocalRelation}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ObjectType, StringType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
@@ -323,8 +324,8 @@ case class ExpressionEncoder[T](
extractProjection(inputRow)
} catch {
case e: Exception =>
- throw new RuntimeException(
- s"Error while encoding: $e\n${serializer.map(_.simpleString).mkString("\n")}", e)
+ throw new RuntimeException(s"Error while encoding: $e\n" +
+ s"${serializer.map(_.simpleString(SQLConf.get.maxToStringFields)).mkString("\n")}", e)
}
/**
@@ -336,7 +337,8 @@ case class ExpressionEncoder[T](
constructProjection(row).get(0, ObjectType(clsTag.runtimeClass)).asInstanceOf[T]
} catch {
case e: Exception =>
- throw new RuntimeException(s"Error while decoding: $e\n${deserializer.simpleString}", e)
+ throw new RuntimeException(s"Error while decoding: $e\n" +
+ s"${deserializer.simpleString(SQLConf.get.maxToStringFields)}", e)
}
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index c89c227..d5d1195 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -259,12 +259,12 @@ abstract class Expression extends TreeNode[Expression] {
// Marks this as final, Expression.verboseString should never be called, and thus shouldn't be
// overridden by concrete classes.
- final override def verboseString: String = simpleString
+ final override def verboseString(maxFields: Int): String = simpleString(maxFields)
- override def simpleString: String = toString
+ override def simpleString(maxFields: Int): String = toString
override def toString: String = prettyName + truncatedString(
- flatArguments.toSeq, "(", ", ", ")")
+ flatArguments.toSeq, "(", ", ", ")", SQLConf.get.maxToStringFields)
/**
* Returns SQL representation of this expression. For expressions extending [[NonSQLExpression]],
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala
index 17d4a0d..17fff64 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala
@@ -197,7 +197,7 @@ trait Block extends TreeNode[Block] with JavaCode {
case _ => code"$this\n$other"
}
- override def verboseString: String = toString
+ override def verboseString(maxFields: Int): String = toString
}
object Block {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
index 9c74fdf..6b6da1c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
/**
@@ -101,7 +102,7 @@ case class UserDefinedGenerator(
inputRow = new InterpretedProjection(children)
convertToScala = {
val inputSchema = StructType(children.map { e =>
- StructField(e.simpleString, e.dataType, nullable = true)
+ StructField(e.simpleString(SQLConf.get.maxToStringFields), e.dataType, nullable = true)
})
CatalystTypeConverters.createToScalaConverter(inputSchema)
}.asInstanceOf[InternalRow => Row]
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
index 7141b6e..e6cc11d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
@@ -76,7 +76,9 @@ case class NamedLambdaVariable(
override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
- override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}"
+ override def simpleString(maxFields: Int): String = {
+ s"lambda $name#${exprId.id}: ${dataType.simpleString(maxFields)}"
+ }
}
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
index 0cdeda9..1f1decc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.RandomUUIDGenerator
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -40,7 +41,7 @@ case class PrintToStderr(child: Expression) extends UnaryExpression {
input
}
- private val outputPrefix = s"Result of ${child.simpleString} is "
+ private val outputPrefix = s"Result of ${child.simpleString(SQLConf.get.maxToStringFields)} is "
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val outputPrefixField = ctx.addReferenceObj("outputPrefix", outputPrefix)
@@ -72,7 +73,7 @@ case class AssertTrue(child: Expression) extends UnaryExpression with ImplicitCa
override def prettyName: String = "assert_true"
- private val errMsg = s"'${child.simpleString}' is not true!"
+ private val errMsg = s"'${child.simpleString(SQLConf.get.maxToStringFields)}' is not true!"
override def eval(input: InternalRow) : Any = {
val v = child.eval(input)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index 02b48f9..131459b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -330,7 +330,9 @@ case class AttributeReference(
// Since the expression id is not in the first constructor it is missing from the default
// tree string.
- override def simpleString: String = s"$name#${exprId.id}: ${dataType.simpleString}"
+ override def simpleString(maxFields: Int): String = {
+ s"$name#${exprId.id}: ${dataType.simpleString(maxFields)}"
+ }
override def sql: String = {
val qualifierPrefix = if (qualifier.nonEmpty) qualifier.mkString(".") + "." else ""
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index ca0cea6..125181f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -172,9 +172,9 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
*/
protected def statePrefix = if (missingInput.nonEmpty && children.nonEmpty) "!" else ""
- override def simpleString: String = statePrefix + super.simpleString
+ override def simpleString(maxFields: Int): String = statePrefix + super.simpleString(maxFields)
- override def verboseString: String = simpleString
+ override def verboseString(maxFields: Int): String = simpleString(maxFields)
/**
* All the subqueries of current plan.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 3ad2ee6..51e0f4b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -36,8 +36,8 @@ abstract class LogicalPlan
/** Returns true if this subtree has data from a streaming data source. */
def isStreaming: Boolean = children.exists(_.isStreaming == true)
- override def verboseStringWithSuffix: String = {
- super.verboseString + statsCache.map(", " + _.toString).getOrElse("")
+ override def verboseStringWithSuffix(maxFields: Int): String = {
+ super.verboseString(maxFields) + statsCache.map(", " + _.toString).getOrElse("")
}
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index a26ec4e..d8b3a4a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -468,7 +468,7 @@ case class View(
override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance()))
- override def simpleString: String = {
+ override def simpleString(maxFields: Int): String = {
s"View (${desc.identifier}, ${output.mkString("[", ",", "]")})"
}
}
@@ -484,8 +484,8 @@ case class View(
case class With(child: LogicalPlan, cteRelations: Seq[(String, SubqueryAlias)]) extends UnaryNode {
override def output: Seq[Attribute] = child.output
- override def simpleString: String = {
- val cteAliases = truncatedString(cteRelations.map(_._1), "[", ", ", "]")
+ override def simpleString(maxFields: Int): String = {
+ val cteAliases = truncatedString(cteRelations.map(_._1), "[", ", ", "]", maxFields)
s"CTE $cteAliases"
}
@@ -557,7 +557,7 @@ case class Range(
override def newInstance(): Range = copy(output = output.map(_.newInstance()))
- override def simpleString: String = {
+ override def simpleString(maxFields: Int): String = {
s"Range ($start, $end, step=$step, splits=$numSlices)"
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 2e9f9f5..21e59bb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
@@ -433,17 +434,17 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
private lazy val allChildren: Set[TreeNode[_]] = (children ++ innerChildren).toSet[TreeNode[_]]
/** Returns a string representing the arguments to this node, minus any children */
- def argString: String = stringArgs.flatMap {
+ def argString(maxFields: Int): String = stringArgs.flatMap {
case tn: TreeNode[_] if allChildren.contains(tn) => Nil
case Some(tn: TreeNode[_]) if allChildren.contains(tn) => Nil
- case Some(tn: TreeNode[_]) => tn.simpleString :: Nil
- case tn: TreeNode[_] => tn.simpleString :: Nil
+ case Some(tn: TreeNode[_]) => tn.simpleString(maxFields) :: Nil
+ case tn: TreeNode[_] => tn.simpleString(maxFields) :: Nil
case seq: Seq[Any] if seq.toSet.subsetOf(allChildren.asInstanceOf[Set[Any]]) => Nil
case iter: Iterable[_] if iter.isEmpty => Nil
- case seq: Seq[_] => truncatedString(seq, "[", ", ", "]") :: Nil
- case set: Set[_] => truncatedString(set.toSeq, "{", ", ", "}") :: Nil
+ case seq: Seq[_] => truncatedString(seq, "[", ", ", "]", maxFields) :: Nil
+ case set: Set[_] => truncatedString(set.toSeq, "{", ", ", "}", maxFields) :: Nil
case array: Array[_] if array.isEmpty => Nil
- case array: Array[_] => truncatedString(array, "[", ", ", "]") :: Nil
+ case array: Array[_] => truncatedString(array, "[", ", ", "]", maxFields) :: Nil
case null => Nil
case None => Nil
case Some(null) => Nil
@@ -456,24 +457,33 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
case other => other :: Nil
}.mkString(", ")
- /** ONE line description of this node. */
- def simpleString: String = s"$nodeName $argString".trim
+ /**
+ * ONE line description of this node.
+ * @param maxFields Maximum number of fields that will be converted to strings.
+ * Any elements beyond the limit will be dropped.
+ */
+ def simpleString(maxFields: Int): String = {
+ s"$nodeName ${argString(maxFields)}".trim
+ }
/** ONE line description of this node with more information */
- def verboseString: String
+ def verboseString(maxFields: Int): String
/** ONE line description of this node with some suffix information */
- def verboseStringWithSuffix: String = verboseString
+ def verboseStringWithSuffix(maxFields: Int): String = verboseString(maxFields)
override def toString: String = treeString
/** Returns a string representation of the nodes in this tree */
def treeString: String = treeString(verbose = true)
- def treeString(verbose: Boolean, addSuffix: Boolean = false): String = {
+ def treeString(
+ verbose: Boolean,
+ addSuffix: Boolean = false,
+ maxFields: Int = SQLConf.get.maxToStringFields): String = {
val writer = new StringBuilderWriter()
try {
- treeString(writer, verbose, addSuffix)
+ treeString(writer, verbose, addSuffix, maxFields)
writer.toString
} finally {
writer.close()
@@ -483,8 +493,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
def treeString(
writer: Writer,
verbose: Boolean,
- addSuffix: Boolean): Unit = {
- generateTreeString(0, Nil, writer, verbose, "", addSuffix)
+ addSuffix: Boolean,
+ maxFields: Int): Unit = {
+ generateTreeString(0, Nil, writer, verbose, "", addSuffix, maxFields)
}
/**
@@ -550,7 +561,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
writer: Writer,
verbose: Boolean,
prefix: String = "",
- addSuffix: Boolean = false): Unit = {
+ addSuffix: Boolean = false,
+ maxFields: Int): Unit = {
if (depth > 0) {
lastChildren.init.foreach { isLast =>
@@ -560,9 +572,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
}
val str = if (verbose) {
- if (addSuffix) verboseStringWithSuffix else verboseString
+ if (addSuffix) verboseStringWithSuffix(maxFields) else verboseString(maxFields)
} else {
- simpleString
+ simpleString(maxFields)
}
writer.write(prefix)
writer.write(str)
@@ -571,17 +583,17 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
if (innerChildren.nonEmpty) {
innerChildren.init.foreach(_.generateTreeString(
depth + 2, lastChildren :+ children.isEmpty :+ false, writer, verbose,
- addSuffix = addSuffix))
+ addSuffix = addSuffix, maxFields = maxFields))
innerChildren.last.generateTreeString(
depth + 2, lastChildren :+ children.isEmpty :+ true, writer, verbose,
- addSuffix = addSuffix)
+ addSuffix = addSuffix, maxFields = maxFields)
}
if (children.nonEmpty) {
children.init.foreach(_.generateTreeString(
- depth + 1, lastChildren :+ false, writer, verbose, prefix, addSuffix))
+ depth + 1, lastChildren :+ false, writer, verbose, prefix, addSuffix, maxFields))
children.last.generateTreeString(
- depth + 1, lastChildren :+ true, writer, verbose, prefix, addSuffix)
+ depth + 1, lastChildren :+ true, writer, verbose, prefix, addSuffix, maxFields)
}
}
@@ -664,7 +676,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
t.forall(_.isInstanceOf[Partitioning]) || t.forall(_.isInstanceOf[DataType]) =>
JArray(t.map(parseToJson).toList)
case t: Seq[_] if t.length > 0 && t.head.isInstanceOf[String] =>
- JString(truncatedString(t, "[", ", ", "]"))
+ JString(truncatedString(t, "[", ", ", "]", SQLConf.get.maxToStringFields))
case t: Seq[_] => JNull
case m: Map[_, _] => JNull
// if it's a scala object, we can simply keep the full class path.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
index 277584b..7f5860e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
@@ -184,14 +184,14 @@ package object util extends Logging {
start: String,
sep: String,
end: String,
- maxNumFields: Int = SQLConf.get.maxToStringFields): String = {
- if (seq.length > maxNumFields) {
+ maxFields: Int): String = {
+ if (seq.length > maxFields) {
if (truncationWarningPrinted.compareAndSet(false, true)) {
logWarning(
"Truncated the string representation of a plan since it was too large. This " +
s"behavior can be adjusted by setting '${SQLConf.MAX_TO_STRING_FIELDS.key}'.")
}
- val numFields = math.max(0, maxNumFields - 1)
+ val numFields = math.max(0, maxFields - 1)
seq.take(numFields).mkString(
start, sep, sep + "... " + (seq.length - numFields) + " more fields" + end)
} else {
@@ -200,7 +200,9 @@ package object util extends Logging {
}
/** Shorthand for calling truncatedString() without start or end strings. */
- def truncatedString[T](seq: Seq[T], sep: String): String = truncatedString(seq, "", sep, "")
+ def truncatedString[T](seq: Seq[T], sep: String, maxFields: Int): String = {
+ truncatedString(seq, "", sep, "", maxFields)
+ }
/* FIX ME
implicit class debugLogging(a: Any) {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index e01d7c5..d563276 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -28,6 +28,7 @@ import org.apache.spark.annotation.Stable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, InterpretedOrdering}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, LegacyTypeStringParser}
import org.apache.spark.sql.catalyst.util.{quoteIdentifier, truncatedString}
+import org.apache.spark.sql.internal.SQLConf
/**
* A [[StructType]] object can be constructed by
@@ -343,7 +344,10 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
override def simpleString: String = {
val fieldTypes = fields.view.map(field => s"${field.name}:${field.dataType.simpleString}")
- truncatedString(fieldTypes, "struct<", ",", ">")
+ truncatedString(
+ fieldTypes,
+ "struct<", ",", ">",
+ SQLConf.get.maxToStringFields)
}
override def catalogString: String = {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala
index 63c7b42..0e0c8e1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala
@@ -215,7 +215,7 @@ class PercentileSuite extends SparkFunSuite {
val percentile2 = new Percentile(child, percentage)
assertEqual(percentile2.checkInputDataTypes(),
TypeCheckFailure(s"Percentage(s) must be between 0.0 and 1.0, " +
- s"but got ${percentage.simpleString}"))
+ s"but got ${percentage.simpleString(100)}"))
}
val nonFoldablePercentage = Seq(NonFoldableLiteral(0.5),
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/UtilSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/UtilSuite.scala
index 9c16202..d95de71 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/UtilSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/UtilSuite.scala
@@ -26,6 +26,6 @@ class UtilSuite extends SparkFunSuite {
assert(truncatedString(Seq(1, 2), "[", ", ", "]", 2) == "[1, 2]")
assert(truncatedString(Seq(1, 2, 3), "[", ", ", "]", 2) == "[1, ... 2 more fields]")
assert(truncatedString(Seq(1, 2, 3), "[", ", ", "]", -5) == "[, ... 3 more fields]")
- assert(truncatedString(Seq(1, 2, 3), ", ") == "1, 2, 3")
+ assert(truncatedString(Seq(1, 2, 3), ", ", 10) == "1, 2, 3")
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 322ffff..1d7dd73 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -52,19 +52,19 @@ trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
// Metadata that describes more details of this scan.
protected def metadata: Map[String, String]
- override def simpleString: String = {
+ override def simpleString(maxFields: Int): String = {
val metadataEntries = metadata.toSeq.sorted.map {
case (key, value) =>
key + ": " + StringUtils.abbreviate(redact(value), 100)
}
- val metadataStr = truncatedString(metadataEntries, " ", ", ", "")
- s"$nodeNamePrefix$nodeName${truncatedString(output, "[", ",", "]")}$metadataStr"
+ val metadataStr = truncatedString(metadataEntries, " ", ", ", "", maxFields)
+ s"$nodeNamePrefix$nodeName${truncatedString(output, "[", ",", "]", maxFields)}$metadataStr"
}
- override def verboseString: String = redact(super.verboseString)
+ override def verboseString(maxFields: Int): String = redact(super.verboseString(maxFields))
- override def treeString(verbose: Boolean, addSuffix: Boolean): String = {
- redact(super.treeString(verbose, addSuffix))
+ override def treeString(verbose: Boolean, addSuffix: Boolean, maxFields: Int): String = {
+ redact(super.treeString(verbose, addSuffix, maxFields))
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 49fb288..981ecae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -79,7 +79,7 @@ case class ExternalRDDScanExec[T](
}
}
- override def simpleString: String = {
+ override def simpleString(maxFields: Int): String = {
s"$nodeName${output.mkString("[", ",", "]")}"
}
}
@@ -156,8 +156,8 @@ case class RDDScanExec(
}
}
- override def simpleString: String = {
- s"$nodeName${truncatedString(output, "[", ",", "]")}"
+ override def simpleString(maxFields: Int): String = {
+ s"$nodeName${truncatedString(output, "[", ",", "]", maxFields)}"
}
// Input can be InternalRow, has to be turned into UnsafeRows.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index eef5a3f..9b8d2e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand}
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampType, _}
import org.apache.spark.util.Utils
@@ -208,27 +209,27 @@ class QueryExecution(
}
}
- private def writePlans(writer: Writer): Unit = {
+ private def writePlans(writer: Writer, maxFields: Int): Unit = {
val (verbose, addSuffix) = (true, false)
writer.write("== Parsed Logical Plan ==\n")
- writeOrError(writer)(logical.treeString(_, verbose, addSuffix))
+ writeOrError(writer)(logical.treeString(_, verbose, addSuffix, maxFields))
writer.write("\n== Analyzed Logical Plan ==\n")
val analyzedOutput = stringOrError(truncatedString(
- analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", "))
+ analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ", maxFields))
writer.write(analyzedOutput)
writer.write("\n")
- writeOrError(writer)(analyzed.treeString(_, verbose, addSuffix))
+ writeOrError(writer)(analyzed.treeString(_, verbose, addSuffix, maxFields))
writer.write("\n== Optimized Logical Plan ==\n")
- writeOrError(writer)(optimizedPlan.treeString(_, verbose, addSuffix))
+ writeOrError(writer)(optimizedPlan.treeString(_, verbose, addSuffix, maxFields))
writer.write("\n== Physical Plan ==\n")
- writeOrError(writer)(executedPlan.treeString(_, verbose, addSuffix))
+ writeOrError(writer)(executedPlan.treeString(_, verbose, addSuffix, maxFields))
}
override def toString: String = withRedaction {
val writer = new StringBuilderWriter()
try {
- writePlans(writer)
+ writePlans(writer, SQLConf.get.maxToStringFields)
writer.toString
} finally {
writer.close()
@@ -280,14 +281,16 @@ class QueryExecution(
/**
* Dumps debug information about query execution into the specified file.
+ *
+ * @param maxFields maximim number of fields converted to string representation.
*/
- def toFile(path: String): Unit = {
+ def toFile(path: String, maxFields: Int = Int.MaxValue): Unit = {
val filePath = new Path(path)
val fs = filePath.getFileSystem(sparkSession.sessionState.newHadoopConf())
val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath)))
try {
- writePlans(writer)
+ writePlans(writer, maxFields)
writer.write("\n== Whole Stage Codegen ==\n")
org.apache.spark.sql.execution.debug.writeCodegen(writer, executedPlan)
} finally {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
index 59ffd16..f554ff0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.metric.SQLMetricInfo
+import org.apache.spark.sql.internal.SQLConf
/**
* :: DeveloperApi ::
@@ -62,7 +63,10 @@ private[execution] object SparkPlanInfo {
case fileScan: FileSourceScanExec => fileScan.metadata
case _ => Map[String, String]()
}
- new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan),
+ new SparkPlanInfo(
+ plan.nodeName,
+ plan.simpleString(SQLConf.get.maxToStringFields),
+ children.map(fromSparkPlan),
metadata, metrics)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index fbda0d8..f4927de 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -87,7 +87,7 @@ trait CodegenSupport extends SparkPlan {
this.parent = parent
ctx.freshNamePrefix = variablePrefix
s"""
- |${ctx.registerComment(s"PRODUCE: ${this.simpleString}")}
+ |${ctx.registerComment(s"PRODUCE: ${this.simpleString(SQLConf.get.maxToStringFields)}")}
|${doProduce(ctx)}
""".stripMargin
}
@@ -188,7 +188,7 @@ trait CodegenSupport extends SparkPlan {
parent.doConsume(ctx, inputVars, rowVar)
}
s"""
- |${ctx.registerComment(s"CONSUME: ${parent.simpleString}")}
+ |${ctx.registerComment(s"CONSUME: ${parent.simpleString(SQLConf.get.maxToStringFields)}")}
|$evaluated
|$consumeFunc
""".stripMargin
@@ -496,8 +496,16 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod
writer: Writer,
verbose: Boolean,
prefix: String = "",
- addSuffix: Boolean = false): Unit = {
- child.generateTreeString(depth, lastChildren, writer, verbose, prefix = "", addSuffix = false)
+ addSuffix: Boolean = false,
+ maxFields: Int): Unit = {
+ child.generateTreeString(
+ depth,
+ lastChildren,
+ writer,
+ verbose,
+ prefix = "",
+ addSuffix = false,
+ maxFields)
}
override def needCopyResult: Boolean = false
@@ -772,8 +780,16 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
writer: Writer,
verbose: Boolean,
prefix: String = "",
- addSuffix: Boolean = false): Unit = {
- child.generateTreeString(depth, lastChildren, writer, verbose, s"*($codegenStageId) ", false)
+ addSuffix: Boolean = false,
+ maxFields: Int): Unit = {
+ child.generateTreeString(
+ depth,
+ lastChildren,
+ writer,
+ verbose,
+ s"*($codegenStageId) ",
+ false,
+ maxFields)
}
override def needStopCheck: Boolean = true
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 4827f83..2355d30 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -922,18 +922,18 @@ case class HashAggregateExec(
"""
}
- override def verboseString: String = toString(verbose = true)
+ override def verboseString(maxFields: Int): String = toString(verbose = true, maxFields)
- override def simpleString: String = toString(verbose = false)
+ override def simpleString(maxFields: Int): String = toString(verbose = false, maxFields)
- private def toString(verbose: Boolean): String = {
+ private def toString(verbose: Boolean, maxFields: Int): String = {
val allAggregateExpressions = aggregateExpressions
testFallbackStartsAt match {
case None =>
- val keyString = truncatedString(groupingExpressions, "[", ", ", "]")
- val functionString = truncatedString(allAggregateExpressions, "[", ", ", "]")
- val outputString = truncatedString(output, "[", ", ", "]")
+ val keyString = truncatedString(groupingExpressions, "[", ", ", "]", maxFields)
+ val functionString = truncatedString(allAggregateExpressions, "[", ", ", "]", maxFields)
+ val outputString = truncatedString(output, "[", ", ", "]", maxFields)
if (verbose) {
s"HashAggregate(keys=$keyString, functions=$functionString, output=$outputString)"
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
index 7145bb0..bd52c63 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
@@ -137,15 +137,15 @@ case class ObjectHashAggregateExec(
}
}
- override def verboseString: String = toString(verbose = true)
+ override def verboseString(maxFields: Int): String = toString(verbose = true, maxFields)
- override def simpleString: String = toString(verbose = false)
+ override def simpleString(maxFields: Int): String = toString(verbose = false, maxFields)
- private def toString(verbose: Boolean): String = {
+ private def toString(verbose: Boolean, maxFields: Int): String = {
val allAggregateExpressions = aggregateExpressions
- val keyString = truncatedString(groupingExpressions, "[", ", ", "]")
- val functionString = truncatedString(allAggregateExpressions, "[", ", ", "]")
- val outputString = truncatedString(output, "[", ", ", "]")
+ val keyString = truncatedString(groupingExpressions, "[", ", ", "]", maxFields)
+ val functionString = truncatedString(allAggregateExpressions, "[", ", ", "]", maxFields)
+ val outputString = truncatedString(output, "[", ", ", "]", maxFields)
if (verbose) {
s"ObjectHashAggregate(keys=$keyString, functions=$functionString, output=$outputString)"
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
index d732b90..7ab6ecc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
@@ -107,16 +107,16 @@ case class SortAggregateExec(
}
}
- override def simpleString: String = toString(verbose = false)
+ override def simpleString(maxFields: Int): String = toString(verbose = false, maxFields)
- override def verboseString: String = toString(verbose = true)
+ override def verboseString(maxFields: Int): String = toString(verbose = true, maxFields)
- private def toString(verbose: Boolean): String = {
+ private def toString(verbose: Boolean, maxFields: Int): String = {
val allAggregateExpressions = aggregateExpressions
- val keyString = truncatedString(groupingExpressions, "[", ", ", "]")
- val functionString = truncatedString(allAggregateExpressions, "[", ", ", "]")
- val outputString = truncatedString(output, "[", ", ", "]")
+ val keyString = truncatedString(groupingExpressions, "[", ", ", "]", maxFields)
+ val functionString = truncatedString(allAggregateExpressions, "[", ", ", "]", maxFields)
+ val outputString = truncatedString(output, "[", ", ", "]", maxFields)
if (verbose) {
s"SortAggregate(key=$keyString, functions=$functionString, output=$outputString)"
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 09effe0..2570b36 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -586,7 +586,9 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
}
}
- override def simpleString: String = s"Range ($start, $end, step=$step, splits=$numSlices)"
+ override def simpleString(maxFields: Int): String = {
+ s"Range ($start, $end, step=$step, splits=$numSlices)"
+ }
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 73eb65f..4109d99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -209,6 +209,6 @@ case class InMemoryRelation(
override protected def otherCopyArgs: Seq[AnyRef] = Seq(statsOfPlanToCache)
- override def simpleString: String =
- s"InMemoryRelation [${truncatedString(output, ", ")}], ${cacheBuilder.storageLevel}"
+ override def simpleString(maxFields: Int): String =
+ s"InMemoryRelation [${truncatedString(output, ", ", maxFields)}], ${cacheBuilder.storageLevel}"
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 1023572..db3604f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -63,7 +63,9 @@ case class LogicalRelation(
case _ => // Do nothing.
}
- override def simpleString: String = s"Relation[${truncatedString(output, ",")}] $relation"
+ override def simpleString(maxFields: Int): String = {
+ s"Relation[${truncatedString(output, ",", maxFields)}] $relation"
+ }
}
object LogicalRelation {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala
index 00b1b5d..f29e786 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala
@@ -48,7 +48,7 @@ case class SaveIntoDataSourceCommand(
Seq.empty[Row]
}
- override def simpleString: String = {
+ override def simpleString(maxFields: Int): String = {
val redacted = SQLConf.get.redactOptions(options)
s"SaveIntoDataSourceCommand ${dataSource}, ${redacted}, ${mode}"
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index fdc5e85..042320e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -68,7 +68,7 @@ case class CreateTempViewUsing(
s"Temporary view '$tableIdent' should not have specified a database")
}
- override def argString: String = {
+ override def argString(maxFields: Int): String = {
s"[tableIdent:$tableIdent " +
userSpecifiedSchema.map(_ + " ").getOrElse("") +
s"replace:$replace " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 0a6b0af..7bf2b8b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -52,8 +52,8 @@ case class DataSourceV2Relation(
override def name: String = table.name()
- override def simpleString: String = {
- s"RelationV2${truncatedString(output, "[", ", ", "]")} $name"
+ override def simpleString(maxFields: Int): String = {
+ s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name"
}
def newWriteSupport(): BatchWriteSupport = source.createWriteSupport(options, schema)
@@ -96,7 +96,9 @@ case class StreamingDataSourceV2Relation(
override def isStreaming: Boolean = true
- override def simpleString: String = "Streaming RelationV2 " + metadataString
+ override def simpleString(maxFields: Int): String = {
+ "Streaming RelationV2 " + metadataString(maxFields)
+ }
override def pushedFilters: Seq[Expression] = Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
index 725bcc3..53e4e77 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
@@ -35,8 +35,8 @@ case class DataSourceV2ScanExec(
@transient batch: Batch)
extends LeafExecNode with ColumnarBatchScan {
- override def simpleString: String = {
- s"ScanV2${truncatedString(output, "[", ", ", "]")} $scanDesc"
+ override def simpleString(maxFields: Int): String = {
+ s"ScanV2${truncatedString(output, "[", ", ", "]", maxFields)} $scanDesc"
}
// TODO: unify the equal/hashCode implementation for all data source v2 query plans.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StreamingScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StreamingScanExec.scala
index c872940..be75fe4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StreamingScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StreamingScanExec.scala
@@ -42,7 +42,7 @@ case class DataSourceV2StreamingScanExec(
@transient scanConfig: ScanConfig)
extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan {
- override def simpleString: String = "ScanV2 " + metadataString
+ override def simpleString(maxFields: Int): String = "ScanV2 " + metadataString(maxFields)
// TODO: unify the equal/hashCode implementation for all data source v2 query plans.
override def equals(other: Any): Boolean = other match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala
index e829f62..f11703c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala
@@ -59,7 +59,7 @@ trait DataSourceV2StringFormat {
case _ => Utils.getSimpleName(source.getClass)
}
- def metadataString: String = {
+ def metadataString(maxFields: Int): String = {
val entries = scala.collection.mutable.ArrayBuffer.empty[(String, String)]
if (pushedFilters.nonEmpty) {
@@ -73,12 +73,12 @@ trait DataSourceV2StringFormat {
}.mkString("[", ",", "]")
}
- val outputStr = truncatedString(output, "[", ", ", "]")
+ val outputStr = truncatedString(output, "[", ", ", "]", maxFields)
val entriesStr = if (entries.nonEmpty) {
truncatedString(entries.map {
case (key, value) => key + ": " + StringUtils.abbreviate(value, 100)
- }, " (", ", ", ")")
+ }, " (", ", ", ")", maxFields)
} else {
""
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index 3511cef..ae8197f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, Codegen
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
@@ -216,7 +217,7 @@ package object debug {
val columnStats: Array[ColumnMetrics] = Array.fill(child.output.size)(new ColumnMetrics())
def dumpStats(): Unit = {
- debugPrint(s"== ${child.simpleString} ==")
+ debugPrint(s"== ${child.simpleString(SQLConf.get.maxToStringFields)} ==")
debugPrint(s"Tuples output: ${tupleCount.value}")
child.output.zip(columnStats).foreach { case (attr, metric) =>
// This is called on driver. All accumulator updates have a fixed value. So it's safe to use
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index bfaf080..56973af 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -198,9 +198,9 @@ case class TakeOrderedAndProjectExec(
override def outputPartitioning: Partitioning = SinglePartition
- override def simpleString: String = {
- val orderByString = truncatedString(sortOrder, "[", ",", "]")
- val outputString = truncatedString(output, "[", ",", "]")
+ override def simpleString(maxFields: Int): String = {
+ val orderByString = truncatedString(sortOrder, "[", ",", "]", maxFields)
+ val outputString = truncatedString(output, "[", ",", "]", maxFields)
s"TakeOrderedAndProject(limit=$limit, orderBy=$orderByString, output=$outputString)"
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 8ad436a..38ecb0d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2, WriteToDataSourceV2Exec}
import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWritSupport, RateControlMicroBatchReadSupport}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset => OffsetV2}
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
@@ -482,9 +483,10 @@ class MicroBatchExecution(
val newBatchesPlan = logicalPlan transform {
case StreamingExecutionRelation(source, output) =>
newData.get(source).map { dataPlan =>
+ val maxFields = SQLConf.get.maxToStringFields
assert(output.size == dataPlan.output.size,
- s"Invalid batch: ${truncatedString(output, ",")} != " +
- s"${truncatedString(dataPlan.output, ",")}")
+ s"Invalid batch: ${truncatedString(output, ",", maxFields)} != " +
+ s"${truncatedString(dataPlan.output, ",", maxFields)}")
val aliases = output.zip(dataPlan.output).map { case (to, from) =>
Alias(from, to.name)(exprId = to.exprId, explicitMetadata = Some(from.metadata))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index f0859aa..89033b7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2StreamingScanExec, StreamingDataSourceV2Relation}
import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2
import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceOptions, StreamingWriteSupportProvider}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, PartitionOffset}
@@ -166,10 +167,10 @@ class ContinuousExecution(
val readSupport = continuousSources(insertedSourceId)
insertedSourceId += 1
val newOutput = readSupport.fullSchema().toAttributes
-
+ val maxFields = SQLConf.get.maxToStringFields
assert(output.size == newOutput.size,
- s"Invalid reader: ${truncatedString(output, ",")} != " +
- s"${truncatedString(newOutput, ",")}")
+ s"Invalid reader: ${truncatedString(output, ",", maxFields)} != " +
+ s"${truncatedString(newOutput, ",", maxFields)}")
replacements ++= output.zip(newOutput)
val loggedOffset = offsets.offsets(0)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index daee089..13b75ae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Stati
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset => OffsetV2}
import org.apache.spark.sql.streaming.OutputMode
@@ -117,7 +118,9 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
}
}
- override def toString: String = s"MemoryStream[${truncatedString(output, ",")}]"
+ override def toString: String = {
+ s"MemoryStream[${truncatedString(output, ",", SQLConf.get.maxToStringFields)}]"
+ }
override def deserializeOffset(json: String): OffsetV2 = LongOffset(json.toLong)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
index 310ebcd..e180d22 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
@@ -51,7 +51,7 @@ case class ScalarSubquery(
override def dataType: DataType = plan.schema.fields.head.dataType
override def children: Seq[Expression] = Nil
override def nullable: Boolean = true
- override def toString: String = plan.simpleString
+ override def toString: String = plan.simpleString(SQLConf.get.maxToStringFields)
override def withNewPlan(query: SubqueryExec): ScalarSubquery = copy(plan = query)
override def semanticEquals(other: Expression): Boolean = other match {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
index 0c47a20..3cc97c9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
@@ -106,6 +106,19 @@ class QueryExecutionSuite extends SharedSQLContext {
}
}
+ test("check maximum fields restriction") {
+ withTempDir { dir =>
+ val path = dir.getCanonicalPath + "/plans.txt"
+ val ds = spark.createDataset(Seq(QueryExecutionTestRecord(
+ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
+ 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26)))
+ ds.queryExecution.debug.toFile(path)
+ val localRelations = Source.fromFile(path).getLines().filter(_.contains("LocalRelation"))
+
+ assert(!localRelations.exists(_.contains("more fields")))
+ }
+ }
+
test("toString() exception/error handling") {
spark.experimental.extraStrategies = Seq(
new SparkStrategy {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index 608f21e..7249eac 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -83,7 +83,7 @@ trait CreateHiveTableAsSelectBase extends DataWritingCommand {
tableDesc: CatalogTable,
tableExists: Boolean): DataWritingCommand
- override def argString: String = {
+ override def argString(maxFields: Int): String = {
s"[Database:${tableDesc.database}, " +
s"TableName: ${tableDesc.identifier.table}, " +
s"InsertIntoHiveTable]"
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org