You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/04/11 01:02:45 UTC
[spark] branch master updated: [SPARK-27088][SQL] Add a
configuration to set log level for each batch at RuleExecutor
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0745333 [SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor
0745333 is described below
commit 074533334d01afdd7862a1ac6c5a7a672bcce3f8
Author: chakravarthiT <45...@users.noreply.github.com>
AuthorDate: Thu Apr 11 10:02:27 2019 +0900
[SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor
## What changes were proposed in this pull request?
Similar to #22406 , which has made log level for plan changes by each rule configurable ,this PR is to make log level for plan changes by each batch configurable,and I have reused the same configuration: "spark.sql.optimizer.planChangeLog.level".
Config proposed in this PR ,
spark.sql.optimizer.planChangeLog.batches - enable plan change logging only for a set of specified batches, separated by commas.
## How was this patch tested?
Added UT , also tested manually and attached screenshots below.
1)Setting spark.sql.optimizer.planChangeLog.leve to warn.
![settingLogLevelToWarn](https://user-images.githubusercontent.com/45845595/54556730-8803dd00-49df-11e9-95ab-ebb0c8d735ef.png)
2)setting spark.sql.optimizer.planChangeLog.batches to Resolution and Subquery.
![settingBatchestoLog](https://user-images.githubusercontent.com/45845595/54556740-8cc89100-49df-11e9-80ab-fbbbe1ff2cdf.png)
3) plan change logging enabled only for a set of specified batches(Resolution and Subquery)
![batchloggingOp](https://user-images.githubusercontent.com/45845595/54556788-ab2e8c80-49df-11e9-9ae0-57815f552896.png)
Closes #24136 from chakravarthiT/logBatches.
Lead-authored-by: chakravarthiT <45...@users.noreply.github.com>
Co-authored-by: chakravarthiT <tc...@gmail.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../spark/sql/catalyst/rules/RuleExecutor.scala | 55 ++++++++++++++--------
.../org/apache/spark/sql/internal/SQLConf.scala | 18 +++++--
.../catalyst/optimizer/OptimizerLoggingSuite.scala | 45 +++++++++++++++---
3 files changed, 87 insertions(+), 31 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
index 088f1fe..3e8a6e0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
@@ -113,7 +113,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
if (effective) {
queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)
- planChangeLogger.log(rule.ruleName, plan, result)
+ planChangeLogger.logRule(rule.ruleName, plan, result)
}
queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime)
queryExecutionMetrics.incNumExecution(rule.ruleName)
@@ -152,15 +152,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
lastPlan = curPlan
}
- if (!batchStartPlan.fastEquals(curPlan)) {
- logDebug(
- s"""
- |=== Result of Batch ${batch.name} ===
- |${sideBySide(batchStartPlan.treeString, curPlan.treeString).mkString("\n")}
- """.stripMargin)
- } else {
- logTrace(s"Batch ${batch.name} has no effect.")
- }
+ planChangeLogger.logBatch(batch.name, batchStartPlan, curPlan)
}
curPlan
@@ -172,21 +164,46 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
private val logRules = SQLConf.get.optimizerPlanChangeRules.map(Utils.stringToSeq)
- def log(ruleName: String, oldPlan: TreeType, newPlan: TreeType): Unit = {
+ private val logBatches = SQLConf.get.optimizerPlanChangeBatches.map(Utils.stringToSeq)
+
+ def logRule(ruleName: String, oldPlan: TreeType, newPlan: TreeType): Unit = {
if (logRules.isEmpty || logRules.get.contains(ruleName)) {
- lazy val message =
+ def message(): String = {
s"""
|=== Applying Rule ${ruleName} ===
|${sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n")}
""".stripMargin
- logLevel match {
- case "TRACE" => logTrace(message)
- case "DEBUG" => logDebug(message)
- case "INFO" => logInfo(message)
- case "WARN" => logWarning(message)
- case "ERROR" => logError(message)
- case _ => logTrace(message)
}
+
+ logBasedOnLevel(message)
+ }
+ }
+
+ def logBatch(batchName: String, oldPlan: TreeType, newPlan: TreeType): Unit = {
+ if (logBatches.isEmpty || logBatches.get.contains(batchName)) {
+ def message(): String = {
+ if (!oldPlan.fastEquals(newPlan)) {
+ s"""
+ |=== Result of Batch ${batchName} ===
+ |${sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n")}
+ """.stripMargin
+ } else {
+ s"Batch ${batchName} has no effect."
+ }
+ }
+
+ logBasedOnLevel(message)
+ }
+ }
+
+ private def logBasedOnLevel(f: => String): Unit = {
+ logLevel match {
+ case "TRACE" => logTrace(f)
+ case "DEBUG" => logDebug(f)
+ case "INFO" => logInfo(f)
+ case "WARN" => logWarning(f)
+ case "ERROR" => logError(f)
+ case _ => logTrace(f)
}
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 157be1b..f33cc86 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -184,8 +184,8 @@ object SQLConf {
val OPTIMIZER_PLAN_CHANGE_LOG_LEVEL = buildConf("spark.sql.optimizer.planChangeLog.level")
.internal()
.doc("Configures the log level for logging the change from the original plan to the new " +
- "plan after a rule is applied. The value can be 'trace', 'debug', 'info', 'warn', or " +
- "'error'. The default log level is 'trace'.")
+ "plan after a rule or batch is applied. The value can be 'trace', 'debug', 'info', " +
+ "'warn', or 'error'. The default log level is 'trace'.")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValue(logLevel => Set("TRACE", "DEBUG", "INFO", "WARN", "ERROR").contains(logLevel),
@@ -195,9 +195,15 @@ object SQLConf {
val OPTIMIZER_PLAN_CHANGE_LOG_RULES = buildConf("spark.sql.optimizer.planChangeLog.rules")
.internal()
- .doc("If this configuration is set, the optimizer will only log plan changes caused by " +
- "applying the rules specified in this configuration. The value can be a list of rule " +
- "names separated by comma.")
+ .doc("Configures a list of rules to be logged in the optimizer, in which the rules are " +
+ "specified by their rule names and separated by comma.")
+ .stringConf
+ .createOptional
+
+ val OPTIMIZER_PLAN_CHANGE_LOG_BATCHES = buildConf("spark.sql.optimizer.planChangeLog.batches")
+ .internal()
+ .doc("Configures a list of batches to be logged in the optimizer, in which the batches " +
+ "are specified by their batch names and separated by comma.")
.stringConf
.createOptional
@@ -1763,6 +1769,8 @@ class SQLConf extends Serializable with Logging {
def optimizerPlanChangeRules: Option[String] = getConf(OPTIMIZER_PLAN_CHANGE_LOG_RULES)
+ def optimizerPlanChangeBatches: Option[String] = getConf(OPTIMIZER_PLAN_CHANGE_LOG_BATCHES)
+
def stateStoreProviderClass: String = getConf(STATE_STORE_PROVIDER_CLASS)
def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala
index 3e9b453..dd7e29d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala
@@ -32,17 +32,20 @@ import org.apache.spark.sql.internal.SQLConf
class OptimizerLoggingSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
- val batches = Batch("Optimizer Batch", FixedPoint(100),
- PushDownPredicate,
- ColumnPruning,
- CollapseProject) :: Nil
+ val batches =
+ Batch("Optimizer Batch", FixedPoint(100),
+ PushDownPredicate, ColumnPruning, CollapseProject) ::
+ Batch("Batch Has No Effect", Once,
+ ColumnPruning) :: Nil
}
class MockAppender extends AppenderSkeleton {
val loggingEvents = new ArrayBuffer[LoggingEvent]()
override def append(loggingEvent: LoggingEvent): Unit = {
- if (loggingEvent.getRenderedMessage().contains("Applying Rule")) {
+ if (loggingEvent.getRenderedMessage().contains("Applying Rule") ||
+ loggingEvent.getRenderedMessage().contains("Result of Batch") ||
+ loggingEvent.getRenderedMessage().contains("has no effect")) {
loggingEvents.append(loggingEvent)
}
}
@@ -51,7 +54,18 @@ class OptimizerLoggingSuite extends PlanTest {
override def requiresLayout(): Boolean = false
}
- private def verifyLog(expectedLevel: Level, expectedRules: Seq[String]): Unit = {
+ private def withLogLevelAndAppender(level: Level, appender: Appender)(f: => Unit): Unit = {
+ val logger = Logger.getLogger(Optimize.getClass.getName.dropRight(1))
+ val restoreLevel = logger.getLevel
+ logger.setLevel(level)
+ logger.addAppender(appender)
+ try f finally {
+ logger.setLevel(restoreLevel)
+ logger.removeAppender(appender)
+ }
+ }
+
+ private def verifyLog(expectedLevel: Level, expectedRulesOrBatches: Seq[String]): Unit = {
val logAppender = new MockAppender()
withLogAppender(logAppender,
loggerName = Some(Optimize.getClass.getName.dropRight(1)), level = Some(Level.TRACE)) {
@@ -61,7 +75,8 @@ class OptimizerLoggingSuite extends PlanTest {
comparePlans(Optimize.execute(query), expected)
}
val logMessages = logAppender.loggingEvents.map(_.getRenderedMessage)
- assert(expectedRules.forall(rule => logMessages.exists(_.contains(rule))))
+ assert(expectedRulesOrBatches.forall
+ (ruleOrBatch => logMessages.exists(_.contains(ruleOrBatch))))
assert(logAppender.loggingEvents.forall(_.getLevel == expectedLevel))
}
@@ -135,4 +150,20 @@ class OptimizerLoggingSuite extends PlanTest {
}
}
}
+
+ test("test log batches which change the plan") {
+ withSQLConf(
+ SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_BATCHES.key -> "Optimizer Batch",
+ SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_LEVEL.key -> "INFO") {
+ verifyLog(Level.INFO, Seq("Optimizer Batch"))
+ }
+ }
+
+ test("test log batches which do not change the plan") {
+ withSQLConf(
+ SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_BATCHES.key -> "Batch Has No Effect",
+ SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_LEVEL.key -> "INFO") {
+ verifyLog(Level.INFO, Seq("Batch Has No Effect"))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org