You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/11/20 11:45:25 UTC

spark git commit: [SPARK-22533][CORE] Handle deprecated names in ConfigEntry.

Repository: spark
Updated Branches:
  refs/heads/master 3c3eebc87 -> c13b60e01


[SPARK-22533][CORE] Handle deprecated names in ConfigEntry.

This change hooks up the config reader to `SparkConf.getDeprecatedConfig`,
so that config constants with deprecated names generate the proper warnings.
It also changes two deprecated configs from the new "alternatives" system to
the old deprecation system, since they're not yet hooked up to each other.

Added a few unit tests to verify the desired behavior.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #19760 from vanzin/SPARK-22533.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c13b60e0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c13b60e0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c13b60e0

Branch: refs/heads/master
Commit: c13b60e0194c90156e74d10b19f94c70675d21ae
Parents: 3c3eebc
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Mon Nov 20 12:45:21 2017 +0100
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Nov 20 12:45:21 2017 +0100

----------------------------------------------------------------------
 .../src/main/scala/org/apache/spark/SparkConf.scala | 16 ++++++++++------
 .../spark/internal/config/ConfigProvider.scala      |  4 +++-
 .../org/apache/spark/internal/config/package.scala  |  2 --
 .../scala/org/apache/spark/SparkConfSuite.scala     |  7 +++++++
 4 files changed, 20 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c13b60e0/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index ee726df..0e08ff6 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark
 
+import java.util.{Map => JMap}
 import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.JavaConverters._
@@ -24,6 +25,7 @@ import scala.collection.mutable.LinkedHashSet
 
 import org.apache.avro.{Schema, SchemaNormalization}
 
+import org.apache.spark.deploy.history.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.serializer.KryoSerializer
@@ -370,7 +372,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
 
   /** Get a parameter as an Option */
   def getOption(key: String): Option[String] = {
-    Option(settings.get(key)).orElse(getDeprecatedConfig(key, this))
+    Option(settings.get(key)).orElse(getDeprecatedConfig(key, settings))
   }
 
   /** Get an optional value, applying variable substitution. */
@@ -622,7 +624,7 @@ private[spark] object SparkConf extends Logging {
       AlternateConfig("spark.history.updateInterval", "1.3")),
     "spark.history.fs.cleaner.interval" -> Seq(
       AlternateConfig("spark.history.fs.cleaner.interval.seconds", "1.4")),
-    "spark.history.fs.cleaner.maxAge" -> Seq(
+    MAX_LOG_AGE_S.key -> Seq(
       AlternateConfig("spark.history.fs.cleaner.maxAge.seconds", "1.4")),
     "spark.yarn.am.waitTime" -> Seq(
       AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3",
@@ -663,8 +665,10 @@ private[spark] object SparkConf extends Logging {
       AlternateConfig("spark.yarn.jar", "2.0")),
     "spark.yarn.access.hadoopFileSystems" -> Seq(
       AlternateConfig("spark.yarn.access.namenodes", "2.2")),
-    "spark.maxRemoteBlockSizeFetchToMem" -> Seq(
-      AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3"))
+    MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key -> Seq(
+      AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")),
+    LISTENER_BUS_EVENT_QUEUE_CAPACITY.key -> Seq(
+      AlternateConfig("spark.scheduler.listenerbus.eventqueue.size", "2.3"))
   )
 
   /**
@@ -704,9 +708,9 @@ private[spark] object SparkConf extends Logging {
    * Looks for available deprecated keys for the given config option, and return the first
    * value available.
    */
-  def getDeprecatedConfig(key: String, conf: SparkConf): Option[String] = {
+  def getDeprecatedConfig(key: String, conf: JMap[String, String]): Option[String] = {
     configsWithAlternatives.get(key).flatMap { alts =>
-      alts.collectFirst { case alt if conf.contains(alt.key) =>
+      alts.collectFirst { case alt if conf.containsKey(alt.key) =>
         val value = conf.get(alt.key)
         if (alt.translation != null) alt.translation(value) else value
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/c13b60e0/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala
index 5d98a11..392f9d5 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala
@@ -19,6 +19,8 @@ package org.apache.spark.internal.config
 
 import java.util.{Map => JMap}
 
+import org.apache.spark.SparkConf
+
 /**
  * A source of configuration values.
  */
@@ -53,7 +55,7 @@ private[spark] class SparkConfigProvider(conf: JMap[String, String]) extends Con
 
   override def get(key: String): Option[String] = {
     if (key.startsWith("spark.")) {
-      Option(conf.get(key))
+      Option(conf.get(key)).orElse(SparkConf.getDeprecatedConfig(key, conf))
     } else {
       None
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/c13b60e0/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 7be4d6b..7a90727 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -209,7 +209,6 @@ package object config {
 
   private[spark] val LISTENER_BUS_EVENT_QUEUE_CAPACITY =
     ConfigBuilder("spark.scheduler.listenerbus.eventqueue.capacity")
-      .withAlternative("spark.scheduler.listenerbus.eventqueue.size")
       .intConf
       .checkValue(_ > 0, "The capacity of listener bus event queue must not be negative")
       .createWithDefault(10000)
@@ -404,7 +403,6 @@ package object config {
         "affect both shuffle fetch and block manager remote block fetch. For users who " +
         "enabled external shuffle service, this feature can only be worked when external shuffle" +
         " service is newer than Spark 2.2.")
-      .withAlternative("spark.reducer.maxReqSizeShuffleToMem")
       .bytesConf(ByteUnit.BYTE)
       .createWithDefault(Long.MaxValue)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c13b60e0/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 0897891..c771eb4 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -26,6 +26,7 @@ import scala.util.{Random, Try}
 
 import com.esotericsoftware.kryo.Kryo
 
+import org.apache.spark.deploy.history.config._
 import org.apache.spark.internal.config._
 import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.serializer.{JavaSerializer, KryoRegistrator, KryoSerializer}
@@ -248,6 +249,12 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
 
     conf.set("spark.kryoserializer.buffer.mb", "1.1")
     assert(conf.getSizeAsKb("spark.kryoserializer.buffer") === 1100)
+
+    conf.set("spark.history.fs.cleaner.maxAge.seconds", "42")
+    assert(conf.get(MAX_LOG_AGE_S) === 42L)
+
+    conf.set("spark.scheduler.listenerbus.eventqueue.size", "84")
+    assert(conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY) === 84)
   }
 
   test("akka deprecated configs") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org