You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/03/10 11:18:27 UTC
[spark] branch branch-3.0 updated: [SPARK-31079][SQL] Logging
QueryExecutionMetering in RuleExecutor logger
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new c238455 [SPARK-31079][SQL] Logging QueryExecutionMetering in RuleExecutor logger
c238455 is described below
commit c2384558086b0386a20aa8098cdf7a4c823a5f04
Author: Eric Wu <49...@qq.com>
AuthorDate: Tue Mar 10 19:08:59 2020 +0800
[SPARK-31079][SQL] Logging QueryExecutionMetering in RuleExecutor logger
### What changes were proposed in this pull request?
RuleExecutor already support metering for analyzer/optimizer rules. By providing such information in `PlanChangeLogger`, user can get more information when debugging rule changes .
This PR enhanced `PlanChangeLogger` to display RuleExecutor metrics. This can be easily done by calling the existing API `resetMetrics` and `dumpTimeSpent`, but there might be conflicts if user is also collecting total metrics of a sql job. Thus I introduced `QueryExecutionMetrics`, as the snapshot of `QueryExecutionMetering`, to better support this feature.
Information added to `PlanChangeLogger`
```
=== Metrics of Executed Rules ===
Total number of runs: 554
Total time: 0.107756568 seconds
Total number of effective runs: 11
Total time of effective runs: 0.047615486 seconds
```
### Why are the changes needed?
Provide better plan change debugging user experience
### Does this PR introduce any user-facing change?
Only add more debugging info of `planChangeLog`, default log level is TRACE.
### How was this patch tested?
Update existing tests to verify the new logs
Closes #27846 from Eric5553/ExplainRuleExecMetrics.
Authored-by: Eric Wu <49...@qq.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 15df2a3f40c74cd3950cc48c95c330217e3ef401)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../catalyst/rules/QueryExecutionMetering.scala | 27 ++++++++++++++++++++++
.../spark/sql/catalyst/rules/RuleExecutor.scala | 24 ++++++++++++++++++-
.../catalyst/optimizer/OptimizerLoggingSuite.scala | 9 +++++++-
3 files changed, 58 insertions(+), 2 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala
index 875c46d..8efc359 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala
@@ -37,6 +37,10 @@ case class QueryExecutionMetering() {
timeEffectiveRunsMap.clear()
}
+ def getMetrics(): QueryExecutionMetrics = {
+ QueryExecutionMetrics(totalTime, totalNumRuns, totalNumEffectiveRuns, totalEffectiveTime)
+ }
+
def totalTime: Long = {
timeMap.sum()
}
@@ -45,6 +49,14 @@ case class QueryExecutionMetering() {
numRunsMap.sum()
}
+ def totalNumEffectiveRuns: Long = {
+ numEffectiveRunsMap.sum()
+ }
+
+ def totalEffectiveTime: Long = {
+ timeEffectiveRunsMap.sum()
+ }
+
def incExecutionTimeBy(ruleName: String, delta: Long): Unit = {
timeMap.addAndGet(ruleName, delta)
}
@@ -95,3 +107,18 @@ case class QueryExecutionMetering() {
""".stripMargin
}
}
+
+case class QueryExecutionMetrics(
+ time: Long,
+ numRuns: Long,
+ numEffectiveRuns: Long,
+ timeEffective: Long) {
+
+ def -(metrics: QueryExecutionMetrics): QueryExecutionMetrics = {
+ QueryExecutionMetrics(
+ this.time - metrics.time,
+ this.numRuns - metrics.numRuns,
+ this.numEffectiveRuns - metrics.numEffectiveRuns,
+ this.timeEffective - metrics.timeEffective)
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
index da5242b..bff04d3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
@@ -21,6 +21,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.QueryPlanningTracker
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.trees.TreeNode
+import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
import org.apache.spark.sql.catalyst.util.sideBySide
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils
@@ -37,6 +38,10 @@ object RuleExecutor {
def resetMetrics(): Unit = {
queryExecutionMeter.resetMetrics()
}
+
+ def getCurrentMetrics(): QueryExecutionMetrics = {
+ queryExecutionMeter.getMetrics()
+ }
}
abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
@@ -121,6 +126,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
val planChangeLogger = new PlanChangeLogger()
val tracker: Option[QueryPlanningTracker] = QueryPlanningTracker.get
+ val beforeMetrics = RuleExecutor.getCurrentMetrics()
// Run the structural integrity checker against the initial input
if (!isPlanIntegral(plan)) {
@@ -199,6 +205,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
planChangeLogger.logBatch(batch.name, batchStartPlan, curPlan)
}
+ planChangeLogger.logMetrics(RuleExecutor.getCurrentMetrics() - beforeMetrics)
curPlan
}
@@ -231,7 +238,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
s"""
|=== Result of Batch ${batchName} ===
|${sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n")}
- """.stripMargin
+ """.stripMargin
} else {
s"Batch ${batchName} has no effect."
}
@@ -241,6 +248,21 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
}
}
+ def logMetrics(metrics: QueryExecutionMetrics): Unit = {
+ val totalTime = metrics.time / NANOS_PER_SECOND.toDouble
+ val totalTimeEffective = metrics.timeEffective / NANOS_PER_SECOND.toDouble
+ val message =
+ s"""
+ |=== Metrics of Executed Rules ===
+ |Total number of runs: ${metrics.numRuns}
+ |Total time: ${totalTime} seconds
+ |Total number of effective runs: ${metrics.numEffectiveRuns}
+ |Total time of effective runs: ${totalTimeEffective} seconds
+ """.stripMargin
+
+ logBasedOnLevel(message)
+ }
+
private def logBasedOnLevel(f: => String): Unit = {
logLevel match {
case "TRACE" => logTrace(f)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala
index d3b0a0e..db22121 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala
@@ -49,12 +49,19 @@ class OptimizerLoggingSuite extends PlanTest {
case event => Seq(
"Applying Rule",
"Result of Batch",
- "has no effect").exists(event.getRenderedMessage().contains)
+ "has no effect",
+ "Metrics of Executed Rules").exists(event.getRenderedMessage().contains)
}
val logMessages = events.map(_.getRenderedMessage)
assert(expectedRulesOrBatches.forall
(ruleOrBatch => logMessages.exists(_.contains(ruleOrBatch))))
assert(events.forall(_.getLevel == expectedLevel))
+ val expectedMetrics = Seq(
+ "Total number of runs: 7",
+ "Total time:",
+ "Total number of effective runs: 3",
+ "Total time of effective runs:")
+ assert(expectedMetrics.forall(metrics => logMessages.exists(_.contains(metrics))))
}
test("test log level") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org