You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/04/03 06:55:49 UTC

[spark] branch branch-3.0 updated: [SPARK-30840][CORE][SQL] Add version property for ConfigEntry and ConfigBuilder

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 015f066  [SPARK-30840][CORE][SQL] Add version property for ConfigEntry and ConfigBuilder
015f066 is described below

commit 015f066acbed0a98000f3ccf9c20ddc49fdfb2f6
Author: beliefer <be...@163.com>
AuthorDate: Sat Feb 22 09:46:42 2020 +0900

    [SPARK-30840][CORE][SQL] Add version property for ConfigEntry and ConfigBuilder
    
    ### What changes were proposed in this pull request?
    Spark `ConfigEntry` and `ConfigBuilder` missing Spark version information of each configuration at release. This is not good for Spark user when they visiting the page of spark configuration.
    http://spark.apache.org/docs/latest/configuration.html
    The new Spark SQL config docs looks like:
    ![sql配置截屏](https://user-images.githubusercontent.com/8486025/74604522-cb882f00-50f9-11ea-8683-57a90f9e3347.png)
    
    ```
    > SET -v
    spark.sql.adaptive.enabled      false   When true, enable adaptive query execution.
    spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin       0.2     The relation with a non-empty partition ratio lower than this config will not be considered as the build side of a broadcast-hash join in adaptive execution regardless of its size.This configuration only has an effect when 'spark.sql.adaptive.enabled' is enabled.
    spark.sql.adaptive.optimizeSkewedJoin.enabled   true    When true and adaptive execution is enabled, a skewed join is automatically handled at runtime.
    spark.sql.adaptive.optimizeSkewedJoin.skewedPartitionFactor     10      A partition is considered as a skewed partition if its size is larger than this factor multiple the median partition size and also larger than  spark.sql.adaptive.optimizeSkewedJoin.skewedPartitionSizeThreshold
    spark.sql.adaptive.optimizeSkewedJoin.skewedPartitionMaxSplits  5       Configures the maximum number of task to handle a skewed partition in adaptive skewedjoin.
    spark.sql.adaptive.optimizeSkewedJoin.skewedPartitionSizeThreshold      64MB    Configures the minimum size in bytes for a partition that is considered as a skewed partition in adaptive skewed join.
    spark.sql.adaptive.shuffle.fetchShuffleBlocksInBatch.enabled    true    Whether to fetch the continuous shuffle blocks in batch. Instead of fetching blocks one by one, fetching continuous shuffle blocks for the same map task in batch can reduce IO and improve performance. Note, multiple continuous blocks exist in single fetch request only happen when 'spark.sql.adaptive.enabled' and 'spark.sql.adaptive.shuffle.reducePostShufflePartitions.enabled' is enabled, this feature also depends  [...]
    spark.sql.adaptive.shuffle.localShuffleReader.enabled   true    When true and 'spark.sql.adaptive.enabled' is enabled, this enables the optimization of converting the shuffle reader to local shuffle reader for the shuffle exchange of the broadcast hash join in probe side.
    spark.sql.adaptive.shuffle.maxNumPostShufflePartitions  <undefined>     The advisory maximum number of post-shuffle partitions used in adaptive execution. This is used as the initial number of pre-shuffle partitions. By default it equals to spark.sql.shuffle.partitions. This configuration only has an effect when 'spark.sql.adaptive.enabled' and 'spark.sql.adaptive.shuffle.reducePostShufflePartitions.enabled' is enabled.
    ```
    
    **Note**: Because there are so many configuration items that are exposed and require a lot of finishing, I will add the version numbers of these configuration items in another PR.
    
    ### Why are the changes needed?
    Supplemental configuration version information.
    
    ### Does this PR introduce any user-facing change?
    Yes
    
    ### How was this patch tested?
    Exists UT
    
    Closes #27592 from beliefer/add-version-to-config.
    
    Authored-by: beliefer <be...@163.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../spark/internal/config/ConfigBuilder.scala      | 16 ++++++----
 .../apache/spark/internal/config/ConfigEntry.scala | 35 +++++++++++++++-------
 .../org/apache/spark/sql/internal/SQLConf.scala    |  4 +--
 .../spark/sql/api/python/PythonSQLUtils.scala      |  2 +-
 .../spark/sql/execution/command/SetCommand.scala   | 11 +++++--
 .../apache/spark/sql/internal/SQLConfSuite.scala   |  8 ++---
 sql/gen-sql-config-docs.py                         | 10 +++++--
 7 files changed, 57 insertions(+), 29 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
index 68e1994..8d5959a 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
@@ -129,7 +129,7 @@ private[spark] class TypedConfigBuilder[T](
   def createOptional: OptionalConfigEntry[T] = {
     val entry = new OptionalConfigEntry[T](parent.key, parent._prependedKey,
       parent._prependSeparator, parent._alternatives, converter, stringConverter, parent._doc,
-      parent._public)
+      parent._public, parent._version)
     parent._onCreate.foreach(_(entry))
     entry
   }
@@ -144,7 +144,7 @@ private[spark] class TypedConfigBuilder[T](
       val transformedDefault = converter(stringConverter(default))
       val entry = new ConfigEntryWithDefault[T](parent.key, parent._prependedKey,
         parent._prependSeparator, parent._alternatives, transformedDefault, converter,
-        stringConverter, parent._doc, parent._public)
+        stringConverter, parent._doc, parent._public, parent._version)
       parent._onCreate.foreach(_(entry))
       entry
     }
@@ -154,7 +154,7 @@ private[spark] class TypedConfigBuilder[T](
   def createWithDefaultFunction(defaultFunc: () => T): ConfigEntry[T] = {
     val entry = new ConfigEntryWithDefaultFunction[T](parent.key, parent._prependedKey,
       parent._prependSeparator, parent._alternatives, defaultFunc, converter, stringConverter,
-      parent._doc, parent._public)
+      parent._doc, parent._public, parent._version)
     parent._onCreate.foreach(_ (entry))
     entry
   }
@@ -166,7 +166,7 @@ private[spark] class TypedConfigBuilder[T](
   def createWithDefaultString(default: String): ConfigEntry[T] = {
     val entry = new ConfigEntryWithDefaultString[T](parent.key, parent._prependedKey,
       parent._prependSeparator, parent._alternatives, default, converter, stringConverter,
-      parent._doc, parent._public)
+      parent._doc, parent._public, parent._version)
     parent._onCreate.foreach(_(entry))
     entry
   }
@@ -186,6 +186,7 @@ private[spark] case class ConfigBuilder(key: String) {
   private[config] var _prependSeparator: String = ""
   private[config] var _public = true
   private[config] var _doc = ""
+  private[config] var _version = ""
   private[config] var _onCreate: Option[ConfigEntry[_] => Unit] = None
   private[config] var _alternatives = List.empty[String]
 
@@ -199,6 +200,11 @@ private[spark] case class ConfigBuilder(key: String) {
     this
   }
 
+  def version(v: String): ConfigBuilder = {
+    _version = v
+    this
+  }
+
   /**
    * Registers a callback for when the config entry is finally instantiated. Currently used by
    * SQLConf to keep track of SQL configuration entries.
@@ -255,7 +261,7 @@ private[spark] case class ConfigBuilder(key: String) {
 
   def fallbackConf[T](fallback: ConfigEntry[T]): ConfigEntry[T] = {
     val entry = new FallbackConfigEntry(key, _prependedKey, _prependSeparator, _alternatives, _doc,
-      _public, fallback)
+      _public, _version, fallback)
     _onCreate.foreach(_(entry))
     entry
   }
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
index c5df4c8..8c0b11d 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
@@ -39,6 +39,7 @@ package org.apache.spark.internal.config
  * @param doc the documentation for the configuration
  * @param isPublic if this configuration is public to the user. If it's `false`, this
  *                 configuration is only used internally and we should not expose it to users.
+ * @param version the spark version when the configuration was released.
  * @tparam T the value type
  */
 private[spark] abstract class ConfigEntry[T] (
@@ -49,7 +50,8 @@ private[spark] abstract class ConfigEntry[T] (
     val valueConverter: String => T,
     val stringConverter: T => String,
     val doc: String,
-    val isPublic: Boolean) {
+    val isPublic: Boolean,
+    val version: String) {
 
   import ConfigEntry._
 
@@ -74,7 +76,8 @@ private[spark] abstract class ConfigEntry[T] (
   def defaultValue: Option[T] = None
 
   override def toString: String = {
-    s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, public=$isPublic)"
+    s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, " +
+      s"public=$isPublic, version=$version)"
   }
 }
 
@@ -87,7 +90,8 @@ private class ConfigEntryWithDefault[T] (
     valueConverter: String => T,
     stringConverter: T => String,
     doc: String,
-    isPublic: Boolean)
+    isPublic: Boolean,
+    version: String)
   extends ConfigEntry(
     key,
     prependedKey,
@@ -96,7 +100,8 @@ private class ConfigEntryWithDefault[T] (
     valueConverter,
     stringConverter,
     doc,
-    isPublic
+    isPublic,
+    version
   ) {
 
   override def defaultValue: Option[T] = Some(_defaultValue)
@@ -117,7 +122,8 @@ private class ConfigEntryWithDefaultFunction[T] (
     valueConverter: String => T,
     stringConverter: T => String,
     doc: String,
-    isPublic: Boolean)
+    isPublic: Boolean,
+    version: String)
   extends ConfigEntry(
     key,
     prependedKey,
@@ -126,7 +132,8 @@ private class ConfigEntryWithDefaultFunction[T] (
     valueConverter,
     stringConverter,
     doc,
-    isPublic
+    isPublic,
+    version
   ) {
 
   override def defaultValue: Option[T] = Some(_defaultFunction())
@@ -147,7 +154,8 @@ private class ConfigEntryWithDefaultString[T] (
     valueConverter: String => T,
     stringConverter: T => String,
     doc: String,
-    isPublic: Boolean)
+    isPublic: Boolean,
+    version: String)
   extends ConfigEntry(
     key,
     prependedKey,
@@ -156,7 +164,8 @@ private class ConfigEntryWithDefaultString[T] (
     valueConverter,
     stringConverter,
     doc,
-    isPublic
+    isPublic,
+    version
   ) {
 
   override def defaultValue: Option[T] = Some(valueConverter(_defaultValue))
@@ -181,7 +190,8 @@ private[spark] class OptionalConfigEntry[T](
     val rawValueConverter: String => T,
     val rawStringConverter: T => String,
     doc: String,
-    isPublic: Boolean)
+    isPublic: Boolean,
+    version: String)
   extends ConfigEntry[Option[T]](
     key,
     prependedKey,
@@ -190,7 +200,8 @@ private[spark] class OptionalConfigEntry[T](
     s => Some(rawValueConverter(s)),
     v => v.map(rawStringConverter).orNull,
     doc,
-    isPublic
+    isPublic,
+    version
   ) {
 
   override def defaultValueString: String = ConfigEntry.UNDEFINED
@@ -210,6 +221,7 @@ private[spark] class FallbackConfigEntry[T] (
     alternatives: List[String],
     doc: String,
     isPublic: Boolean,
+    version: String,
     val fallback: ConfigEntry[T])
   extends ConfigEntry[T](
     key,
@@ -219,7 +231,8 @@ private[spark] class FallbackConfigEntry[T] (
     fallback.valueConverter,
     fallback.stringConverter,
     doc,
-    isPublic
+    isPublic,
+    version
   ) {
 
   override def defaultValueString: String = s"<value of ${fallback.key}>"
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 44f3037..9f9e556 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2982,10 +2982,10 @@ class SQLConf extends Serializable with Logging {
    * Return all the configuration definitions that have been defined in [[SQLConf]]. Each
    * definition contains key, defaultValue and doc.
    */
-  def getAllDefinedConfs: Seq[(String, String, String)] = sqlConfEntries.synchronized {
+  def getAllDefinedConfs: Seq[(String, String, String, String)] = sqlConfEntries.synchronized {
     sqlConfEntries.values.asScala.filter(_.isPublic).map { entry =>
       val displayValue = Option(getConfString(entry.key, null)).getOrElse(entry.defaultValueString)
-      (entry.key, displayValue, entry.doc)
+      (entry.key, displayValue, entry.doc, entry.version)
     }.toSeq
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
index bf3055d..03f5a60 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
@@ -40,7 +40,7 @@ private[sql] object PythonSQLUtils {
     FunctionRegistry.functionSet.flatMap(f => FunctionRegistry.builtin.lookupFunction(f)).toArray
   }
 
-  def listSQLConfigs(): Array[(String, String, String)] = {
+  def listSQLConfigs(): Array[(String, String, String, String)] = {
     val conf = new SQLConf()
     // Py4J doesn't seem to translate Seq well, so we convert to an Array.
     conf.getAllDefinedConfs.toArray
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
index a12b261..3dc1d52 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
@@ -115,14 +115,19 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
     case Some(("-v", None)) =>
       val runFunc = (sparkSession: SparkSession) => {
         sparkSession.sessionState.conf.getAllDefinedConfs.sorted.map {
-          case (key, defaultValue, doc) =>
-            Row(key, Option(defaultValue).getOrElse("<undefined>"), doc)
+          case (key, defaultValue, doc, version) =>
+            Row(
+              key,
+              Option(defaultValue).getOrElse("<undefined>"),
+              doc,
+              Option(version).getOrElse("<unknown>"))
         }
       }
       val schema = StructType(
         StructField("key", StringType, nullable = false) ::
           StructField("value", StringType, nullable = false) ::
-          StructField("meaning", StringType, nullable = false) :: Nil)
+          StructField("meaning", StringType, nullable = false) ::
+          StructField("Since version", StringType, nullable = false) :: Nil)
       (schema.toAttributes, runFunc)
 
     // Queries the deprecated "mapred.reduce.tasks" property.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
index c2d8493d..f389465 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
@@ -301,8 +301,8 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
     assert(spark.sessionState.conf.getConfString(fallback.key, "lzo") === "lzo")
 
     val displayValue = spark.sessionState.conf.getAllDefinedConfs
-      .find { case (key, _, _) => key == fallback.key }
-      .map { case (_, v, _) => v }
+      .find { case (key, _, _, _) => key == fallback.key }
+      .map { case (_, v, _, _) => v }
       .get
     assert(displayValue === fallback.defaultValueString)
 
@@ -313,8 +313,8 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
     assert(spark.sessionState.conf.getConfString(fallback.key) === "lzo")
 
     val newDisplayValue = spark.sessionState.conf.getAllDefinedConfs
-      .find { case (key, _, _) => key == fallback.key }
-      .map { case (_, v, _) => v }
+      .find { case (key, _, _, _) => key == fallback.key }
+      .map { case (_, v, _, _) => v }
       .get
     assert(newDisplayValue === "lzo")
 
diff --git a/sql/gen-sql-config-docs.py b/sql/gen-sql-config-docs.py
index 04f5a85..98212ad 100644
--- a/sql/gen-sql-config-docs.py
+++ b/sql/gen-sql-config-docs.py
@@ -25,7 +25,7 @@ from mkdocs.structure.pages import markdown
 from pyspark.java_gateway import launch_gateway
 
 SQLConfEntry = namedtuple(
-    "SQLConfEntry", ["name", "default", "description"])
+    "SQLConfEntry", ["name", "default", "description", "version"])
 
 
 def get_public_sql_configs(jvm):
@@ -34,6 +34,7 @@ def get_public_sql_configs(jvm):
             name=_sql_config._1(),
             default=_sql_config._2(),
             description=_sql_config._3(),
+            version=_sql_config._4()
         )
         for _sql_config in jvm.org.apache.spark.sql.api.python.PythonSQLUtils.listSQLConfigs()
     ]
@@ -49,12 +50,13 @@ def generate_sql_configs_table(sql_configs, path):
 
     ```html
     <table class="table">
-    <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+    <tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
 
     <tr>
         <td><code>spark.sql.adaptive.enabled</code></td>
         <td>false</td>
         <td><p>When true, enable adaptive query execution.</p></td>
+        <td>2.1.0</td>
     </tr>
 
     ...
@@ -68,7 +70,7 @@ def generate_sql_configs_table(sql_configs, path):
         f.write(dedent(
             """
             <table class="table">
-            <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+            <tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
             """
         ))
         for config in sorted(sql_configs, key=lambda x: x.name):
@@ -96,12 +98,14 @@ def generate_sql_configs_table(sql_configs, path):
                     <td><code>{name}</code></td>
                     <td>{default}</td>
                     <td>{description}</td>
+                    <td>{version}</td>
                 </tr>
                 """
                 .format(
                     name=config.name,
                     default=default,
                     description=markdown.markdown(config.description),
+                    version=config.version
                 )
             ))
         f.write("</table>\n")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org