You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/04/11 01:02:45 UTC

[spark] branch master updated: [SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0745333  [SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor
0745333 is described below

commit 074533334d01afdd7862a1ac6c5a7a672bcce3f8
Author: chakravarthiT <45...@users.noreply.github.com>
AuthorDate: Thu Apr 11 10:02:27 2019 +0900

    [SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor
    
    ## What changes were proposed in this pull request?
    
    Similar to #22406 , which has made log level for plan changes by each rule configurable ,this PR is to make log level for plan changes by each batch configurable,and I have reused the same configuration: "spark.sql.optimizer.planChangeLog.level".
    
    Config proposed in this PR ,
    spark.sql.optimizer.planChangeLog.batches - enable plan change logging only for a set of specified batches, separated by commas.
    
    ## How was this patch tested?
    
    Added UT , also tested manually and attached screenshots below.
    
    1)Setting spark.sql.optimizer.planChangeLog.leve to warn.
    
    ![settingLogLevelToWarn](https://user-images.githubusercontent.com/45845595/54556730-8803dd00-49df-11e9-95ab-ebb0c8d735ef.png)
    
    2)setting spark.sql.optimizer.planChangeLog.batches to Resolution and Subquery.
    ![settingBatchestoLog](https://user-images.githubusercontent.com/45845595/54556740-8cc89100-49df-11e9-80ab-fbbbe1ff2cdf.png)
    
    3)  plan change logging enabled only for a set of specified batches(Resolution and Subquery)
    ![batchloggingOp](https://user-images.githubusercontent.com/45845595/54556788-ab2e8c80-49df-11e9-9ae0-57815f552896.png)
    
    Closes #24136 from chakravarthiT/logBatches.
    
    Lead-authored-by: chakravarthiT <45...@users.noreply.github.com>
    Co-authored-by: chakravarthiT <tc...@gmail.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../spark/sql/catalyst/rules/RuleExecutor.scala    | 55 ++++++++++++++--------
 .../org/apache/spark/sql/internal/SQLConf.scala    | 18 +++++--
 .../catalyst/optimizer/OptimizerLoggingSuite.scala | 45 +++++++++++++++---
 3 files changed, 87 insertions(+), 31 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
index 088f1fe..3e8a6e0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
@@ -113,7 +113,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
             if (effective) {
               queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
               queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)
-              planChangeLogger.log(rule.ruleName, plan, result)
+              planChangeLogger.logRule(rule.ruleName, plan, result)
             }
             queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime)
             queryExecutionMetrics.incNumExecution(rule.ruleName)
@@ -152,15 +152,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
         lastPlan = curPlan
       }
 
-      if (!batchStartPlan.fastEquals(curPlan)) {
-        logDebug(
-          s"""
-            |=== Result of Batch ${batch.name} ===
-            |${sideBySide(batchStartPlan.treeString, curPlan.treeString).mkString("\n")}
-          """.stripMargin)
-      } else {
-        logTrace(s"Batch ${batch.name} has no effect.")
-      }
+      planChangeLogger.logBatch(batch.name, batchStartPlan, curPlan)
     }
 
     curPlan
@@ -172,21 +164,46 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
 
     private val logRules = SQLConf.get.optimizerPlanChangeRules.map(Utils.stringToSeq)
 
-    def log(ruleName: String, oldPlan: TreeType, newPlan: TreeType): Unit = {
+    private val logBatches = SQLConf.get.optimizerPlanChangeBatches.map(Utils.stringToSeq)
+
+    def logRule(ruleName: String, oldPlan: TreeType, newPlan: TreeType): Unit = {
       if (logRules.isEmpty || logRules.get.contains(ruleName)) {
-        lazy val message =
+        def message(): String = {
           s"""
              |=== Applying Rule ${ruleName} ===
              |${sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n")}
            """.stripMargin
-        logLevel match {
-          case "TRACE" => logTrace(message)
-          case "DEBUG" => logDebug(message)
-          case "INFO" => logInfo(message)
-          case "WARN" => logWarning(message)
-          case "ERROR" => logError(message)
-          case _ => logTrace(message)
         }
+
+        logBasedOnLevel(message)
+      }
+    }
+
+    def logBatch(batchName: String, oldPlan: TreeType, newPlan: TreeType): Unit = {
+      if (logBatches.isEmpty || logBatches.get.contains(batchName)) {
+        def message(): String = {
+          if (!oldPlan.fastEquals(newPlan)) {
+            s"""
+               |=== Result of Batch ${batchName} ===
+               |${sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n")}
+          """.stripMargin
+          } else {
+            s"Batch ${batchName} has no effect."
+          }
+        }
+
+        logBasedOnLevel(message)
+      }
+    }
+
+    private def logBasedOnLevel(f: => String): Unit = {
+      logLevel match {
+        case "TRACE" => logTrace(f)
+        case "DEBUG" => logDebug(f)
+        case "INFO" => logInfo(f)
+        case "WARN" => logWarning(f)
+        case "ERROR" => logError(f)
+        case _ => logTrace(f)
       }
     }
   }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 157be1b..f33cc86 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -184,8 +184,8 @@ object SQLConf {
   val OPTIMIZER_PLAN_CHANGE_LOG_LEVEL = buildConf("spark.sql.optimizer.planChangeLog.level")
     .internal()
     .doc("Configures the log level for logging the change from the original plan to the new " +
-      "plan after a rule is applied. The value can be 'trace', 'debug', 'info', 'warn', or " +
-      "'error'. The default log level is 'trace'.")
+      "plan after a rule or batch is applied. The value can be 'trace', 'debug', 'info', " +
+      "'warn', or 'error'. The default log level is 'trace'.")
     .stringConf
     .transform(_.toUpperCase(Locale.ROOT))
     .checkValue(logLevel => Set("TRACE", "DEBUG", "INFO", "WARN", "ERROR").contains(logLevel),
@@ -195,9 +195,15 @@ object SQLConf {
 
   val OPTIMIZER_PLAN_CHANGE_LOG_RULES = buildConf("spark.sql.optimizer.planChangeLog.rules")
     .internal()
-    .doc("If this configuration is set, the optimizer will only log plan changes caused by " +
-      "applying the rules specified in this configuration. The value can be a list of rule " +
-      "names separated by comma.")
+    .doc("Configures a list of rules to be logged in the optimizer, in which the rules are " +
+      "specified by their rule names and separated by comma.")
+    .stringConf
+    .createOptional
+
+  val OPTIMIZER_PLAN_CHANGE_LOG_BATCHES = buildConf("spark.sql.optimizer.planChangeLog.batches")
+    .internal()
+    .doc("Configures a list of batches to be logged in the optimizer, in which the batches " +
+      "are specified by their batch names and separated by comma.")
     .stringConf
     .createOptional
 
@@ -1763,6 +1769,8 @@ class SQLConf extends Serializable with Logging {
 
   def optimizerPlanChangeRules: Option[String] = getConf(OPTIMIZER_PLAN_CHANGE_LOG_RULES)
 
+  def optimizerPlanChangeBatches: Option[String] = getConf(OPTIMIZER_PLAN_CHANGE_LOG_BATCHES)
+
   def stateStoreProviderClass: String = getConf(STATE_STORE_PROVIDER_CLASS)
 
   def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala
index 3e9b453..dd7e29d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala
@@ -32,17 +32,20 @@ import org.apache.spark.sql.internal.SQLConf
 class OptimizerLoggingSuite extends PlanTest {
 
   object Optimize extends RuleExecutor[LogicalPlan] {
-    val batches = Batch("Optimizer Batch", FixedPoint(100),
-      PushDownPredicate,
-      ColumnPruning,
-      CollapseProject) :: Nil
+    val batches =
+      Batch("Optimizer Batch", FixedPoint(100),
+        PushDownPredicate, ColumnPruning, CollapseProject) ::
+      Batch("Batch Has No Effect", Once,
+        ColumnPruning) :: Nil
   }
 
   class MockAppender extends AppenderSkeleton {
     val loggingEvents = new ArrayBuffer[LoggingEvent]()
 
     override def append(loggingEvent: LoggingEvent): Unit = {
-      if (loggingEvent.getRenderedMessage().contains("Applying Rule")) {
+      if (loggingEvent.getRenderedMessage().contains("Applying Rule") ||
+        loggingEvent.getRenderedMessage().contains("Result of Batch") ||
+        loggingEvent.getRenderedMessage().contains("has no effect")) {
         loggingEvents.append(loggingEvent)
       }
     }
@@ -51,7 +54,18 @@ class OptimizerLoggingSuite extends PlanTest {
     override def requiresLayout(): Boolean = false
   }
 
-  private def verifyLog(expectedLevel: Level, expectedRules: Seq[String]): Unit = {
+  private def withLogLevelAndAppender(level: Level, appender: Appender)(f: => Unit): Unit = {
+    val logger = Logger.getLogger(Optimize.getClass.getName.dropRight(1))
+    val restoreLevel = logger.getLevel
+    logger.setLevel(level)
+    logger.addAppender(appender)
+    try f finally {
+      logger.setLevel(restoreLevel)
+      logger.removeAppender(appender)
+    }
+  }
+
+  private def verifyLog(expectedLevel: Level, expectedRulesOrBatches: Seq[String]): Unit = {
     val logAppender = new MockAppender()
     withLogAppender(logAppender,
         loggerName = Some(Optimize.getClass.getName.dropRight(1)), level = Some(Level.TRACE)) {
@@ -61,7 +75,8 @@ class OptimizerLoggingSuite extends PlanTest {
       comparePlans(Optimize.execute(query), expected)
     }
     val logMessages = logAppender.loggingEvents.map(_.getRenderedMessage)
-    assert(expectedRules.forall(rule => logMessages.exists(_.contains(rule))))
+    assert(expectedRulesOrBatches.forall
+    (ruleOrBatch => logMessages.exists(_.contains(ruleOrBatch))))
     assert(logAppender.loggingEvents.forall(_.getLevel == expectedLevel))
   }
 
@@ -135,4 +150,20 @@ class OptimizerLoggingSuite extends PlanTest {
       }
     }
   }
+
+  test("test log batches which change the plan") {
+    withSQLConf(
+      SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_BATCHES.key -> "Optimizer Batch",
+      SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_LEVEL.key -> "INFO") {
+      verifyLog(Level.INFO, Seq("Optimizer Batch"))
+    }
+  }
+
+  test("test log batches which do not change the plan") {
+    withSQLConf(
+      SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_BATCHES.key -> "Batch Has No Effect",
+      SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_LEVEL.key -> "INFO") {
+      verifyLog(Level.INFO, Seq("Batch Has No Effect"))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org