You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/03/10 11:18:27 UTC

[spark] branch branch-3.0 updated: [SPARK-31079][SQL] Logging QueryExecutionMetering in RuleExecutor logger

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new c238455  [SPARK-31079][SQL] Logging QueryExecutionMetering in RuleExecutor logger
c238455 is described below

commit c2384558086b0386a20aa8098cdf7a4c823a5f04
Author: Eric Wu <49...@qq.com>
AuthorDate: Tue Mar 10 19:08:59 2020 +0800

    [SPARK-31079][SQL] Logging QueryExecutionMetering in RuleExecutor logger
    
    ### What changes were proposed in this pull request?
    RuleExecutor already support metering for analyzer/optimizer rules. By providing such information in `PlanChangeLogger`, user can get more information when debugging rule changes .
    
    This PR enhanced `PlanChangeLogger` to display RuleExecutor metrics. This can be easily done by calling the existing API `resetMetrics` and `dumpTimeSpent`, but there might be conflicts if user is also collecting total metrics of a sql job. Thus I introduced `QueryExecutionMetrics`, as the snapshot of `QueryExecutionMetering`, to better support this feature.
    
    Information added to `PlanChangeLogger`
    ```
    === Metrics of Executed Rules ===
    Total number of runs: 554
    Total time: 0.107756568 seconds
    Total number of effective runs: 11
    Total time of effective runs: 0.047615486 seconds
    ```
    
    ### Why are the changes needed?
    Provide better plan change debugging user experience
    
    ### Does this PR introduce any user-facing change?
    Only add more debugging info of `planChangeLog`, default log level is TRACE.
    
    ### How was this patch tested?
    Update existing tests to verify the new logs
    
    Closes #27846 from Eric5553/ExplainRuleExecMetrics.
    
    Authored-by: Eric Wu <49...@qq.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 15df2a3f40c74cd3950cc48c95c330217e3ef401)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../catalyst/rules/QueryExecutionMetering.scala    | 27 ++++++++++++++++++++++
 .../spark/sql/catalyst/rules/RuleExecutor.scala    | 24 ++++++++++++++++++-
 .../catalyst/optimizer/OptimizerLoggingSuite.scala |  9 +++++++-
 3 files changed, 58 insertions(+), 2 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala
index 875c46d..8efc359 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala
@@ -37,6 +37,10 @@ case class QueryExecutionMetering() {
     timeEffectiveRunsMap.clear()
   }
 
+  def getMetrics(): QueryExecutionMetrics = {
+    QueryExecutionMetrics(totalTime, totalNumRuns, totalNumEffectiveRuns, totalEffectiveTime)
+  }
+
   def totalTime: Long = {
     timeMap.sum()
   }
@@ -45,6 +49,14 @@ case class QueryExecutionMetering() {
     numRunsMap.sum()
   }
 
+  def totalNumEffectiveRuns: Long = {
+    numEffectiveRunsMap.sum()
+  }
+
+  def totalEffectiveTime: Long = {
+    timeEffectiveRunsMap.sum()
+  }
+
   def incExecutionTimeBy(ruleName: String, delta: Long): Unit = {
     timeMap.addAndGet(ruleName, delta)
   }
@@ -95,3 +107,18 @@ case class QueryExecutionMetering() {
      """.stripMargin
   }
 }
+
+case class QueryExecutionMetrics(
+    time: Long,
+    numRuns: Long,
+    numEffectiveRuns: Long,
+    timeEffective: Long) {
+
+  def -(metrics: QueryExecutionMetrics): QueryExecutionMetrics = {
+    QueryExecutionMetrics(
+      this.time - metrics.time,
+      this.numRuns - metrics.numRuns,
+      this.numEffectiveRuns - metrics.numEffectiveRuns,
+      this.timeEffective - metrics.timeEffective)
+  }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
index da5242b..bff04d3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
@@ -21,6 +21,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.QueryPlanningTracker
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.trees.TreeNode
+import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
 import org.apache.spark.sql.catalyst.util.sideBySide
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.Utils
@@ -37,6 +38,10 @@ object RuleExecutor {
   def resetMetrics(): Unit = {
     queryExecutionMeter.resetMetrics()
   }
+
+  def getCurrentMetrics(): QueryExecutionMetrics = {
+    queryExecutionMeter.getMetrics()
+  }
 }
 
 abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
@@ -121,6 +126,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
     val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
     val planChangeLogger = new PlanChangeLogger()
     val tracker: Option[QueryPlanningTracker] = QueryPlanningTracker.get
+    val beforeMetrics = RuleExecutor.getCurrentMetrics()
 
     // Run the structural integrity checker against the initial input
     if (!isPlanIntegral(plan)) {
@@ -199,6 +205,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
 
       planChangeLogger.logBatch(batch.name, batchStartPlan, curPlan)
     }
+    planChangeLogger.logMetrics(RuleExecutor.getCurrentMetrics() - beforeMetrics)
 
     curPlan
   }
@@ -231,7 +238,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
             s"""
                |=== Result of Batch ${batchName} ===
                |${sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n")}
-          """.stripMargin
+            """.stripMargin
           } else {
             s"Batch ${batchName} has no effect."
           }
@@ -241,6 +248,21 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
       }
     }
 
+    def logMetrics(metrics: QueryExecutionMetrics): Unit = {
+      val totalTime = metrics.time / NANOS_PER_SECOND.toDouble
+      val totalTimeEffective = metrics.timeEffective / NANOS_PER_SECOND.toDouble
+      val message =
+        s"""
+           |=== Metrics of Executed Rules ===
+           |Total number of runs: ${metrics.numRuns}
+           |Total time: ${totalTime} seconds
+           |Total number of effective runs: ${metrics.numEffectiveRuns}
+           |Total time of effective runs: ${totalTimeEffective} seconds
+        """.stripMargin
+
+      logBasedOnLevel(message)
+    }
+
     private def logBasedOnLevel(f: => String): Unit = {
       logLevel match {
         case "TRACE" => logTrace(f)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala
index d3b0a0e..db22121 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala
@@ -49,12 +49,19 @@ class OptimizerLoggingSuite extends PlanTest {
       case event => Seq(
         "Applying Rule",
         "Result of Batch",
-        "has no effect").exists(event.getRenderedMessage().contains)
+        "has no effect",
+        "Metrics of Executed Rules").exists(event.getRenderedMessage().contains)
     }
     val logMessages = events.map(_.getRenderedMessage)
     assert(expectedRulesOrBatches.forall
     (ruleOrBatch => logMessages.exists(_.contains(ruleOrBatch))))
     assert(events.forall(_.getLevel == expectedLevel))
+    val expectedMetrics = Seq(
+      "Total number of runs: 7",
+      "Total time:",
+      "Total number of effective runs: 3",
+      "Total time of effective runs:")
+    assert(expectedMetrics.forall(metrics => logMessages.exists(_.contains(metrics))))
   }
 
   test("test log level") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org