You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/04/17 12:06:06 UTC

spark git commit: [SPARK-6046] [core] Reorganize deprecated config support in SparkConf.

Repository: spark
Updated Branches:
  refs/heads/master f7a25644e -> 4527761bc


 [SPARK-6046] [core] Reorganize deprecated config support in SparkConf.

This change tries to follow the chosen way for handling deprecated
configs in SparkConf: all values (old and new) are kept in the conf
object, and newer names take precedence over older ones when
retrieving the value.

Warnings are logged when config options are set, which generally happens
on the driver node (where the logs are most visible).

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #5514 from vanzin/SPARK-6046 and squashes the following commits:

9371529 [Marcelo Vanzin] Avoid math.
6cf3f11 [Marcelo Vanzin] Review feedback.
2445d48 [Marcelo Vanzin] Fix (and cleanup) update interval initialization.
b6824be [Marcelo Vanzin] Clean up the other deprecated config use also.
ab20351 [Marcelo Vanzin] Update FsHistoryProvider to only retrieve new config key.
2c93209 [Marcelo Vanzin] [SPARK-6046] [core] Reorganize deprecated config support in SparkConf.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4527761b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4527761b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4527761b

Branch: refs/heads/master
Commit: 4527761bcd6501c362baf2780905a0018b9a74ba
Parents: f7a2564
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Fri Apr 17 11:06:01 2015 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Fri Apr 17 11:06:01 2015 +0100

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkConf.scala | 174 ++++++++++---------
 .../deploy/history/FsHistoryProvider.scala      |   9 +-
 .../org/apache/spark/executor/Executor.scala    |   5 +-
 .../scala/org/apache/spark/SparkConfSuite.scala |  22 +++
 docs/monitoring.md                              |   6 +-
 .../org/apache/spark/deploy/yarn/Client.scala   |   3 +-
 6 files changed, 124 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4527761b/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 390e631..b0186e9 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -68,6 +68,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
     if (value == null) {
       throw new NullPointerException("null value for " + key)
     }
+    logDeprecationWarning(key)
     settings.put(key, value)
     this
   }
@@ -134,13 +135,15 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
 
   /** Set multiple parameters together */
   def setAll(settings: Traversable[(String, String)]): SparkConf = {
-    this.settings.putAll(settings.toMap.asJava)
+    settings.foreach { case (k, v) => set(k, v) }
     this
   }
 
   /** Set a parameter if it isn't already configured */
   def setIfMissing(key: String, value: String): SparkConf = {
-    settings.putIfAbsent(key, value)
+    if (settings.putIfAbsent(key, value) == null) {
+      logDeprecationWarning(key)
+    }
     this
   }
 
@@ -174,8 +177,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
     getOption(key).getOrElse(defaultValue)
   }
 
-  /** 
-   * Get a time parameter as seconds; throws a NoSuchElementException if it's not set. If no 
+  /**
+   * Get a time parameter as seconds; throws a NoSuchElementException if it's not set. If no
    * suffix is provided then seconds are assumed.
    * @throws NoSuchElementException
    */
@@ -183,36 +186,35 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
     Utils.timeStringAsSeconds(get(key))
   }
 
-  /** 
-   * Get a time parameter as seconds, falling back to a default if not set. If no 
+  /**
+   * Get a time parameter as seconds, falling back to a default if not set. If no
    * suffix is provided then seconds are assumed.
-   * 
    */
   def getTimeAsSeconds(key: String, defaultValue: String): Long = {
     Utils.timeStringAsSeconds(get(key, defaultValue))
   }
 
-  /** 
-   * Get a time parameter as milliseconds; throws a NoSuchElementException if it's not set. If no 
-   * suffix is provided then milliseconds are assumed. 
+  /**
+   * Get a time parameter as milliseconds; throws a NoSuchElementException if it's not set. If no
+   * suffix is provided then milliseconds are assumed.
    * @throws NoSuchElementException
    */
   def getTimeAsMs(key: String): Long = {
     Utils.timeStringAsMs(get(key))
   }
 
-  /** 
-   * Get a time parameter as milliseconds, falling back to a default if not set. If no 
-   * suffix is provided then milliseconds are assumed. 
+  /**
+   * Get a time parameter as milliseconds, falling back to a default if not set. If no
+   * suffix is provided then milliseconds are assumed.
    */
   def getTimeAsMs(key: String, defaultValue: String): Long = {
     Utils.timeStringAsMs(get(key, defaultValue))
   }
-  
+
 
   /** Get a parameter as an Option */
   def getOption(key: String): Option[String] = {
-    Option(settings.get(key))
+    Option(settings.get(key)).orElse(getDeprecatedConfig(key, this))
   }
 
   /** Get all parameters as a list of pairs */
@@ -379,13 +381,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
         }
       }
     }
-
-    // Warn against the use of deprecated configs
-    deprecatedConfigs.values.foreach { dc =>
-      if (contains(dc.oldName)) {
-        dc.warn()
-      }
-    }
   }
 
   /**
@@ -400,19 +395,44 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
 
 private[spark] object SparkConf extends Logging {
 
+  /**
+   * Maps deprecated config keys to information about the deprecation.
+   *
+   * The extra information is logged as a warning when the config is present in the user's
+   * configuration.
+   */
   private val deprecatedConfigs: Map[String, DeprecatedConfig] = {
     val configs = Seq(
-      DeprecatedConfig("spark.files.userClassPathFirst", "spark.executor.userClassPathFirst",
-        "1.3"),
-      DeprecatedConfig("spark.yarn.user.classpath.first", null, "1.3",
-        "Use spark.{driver,executor}.userClassPathFirst instead."),
-      DeprecatedConfig("spark.history.fs.updateInterval",
-        "spark.history.fs.update.interval.seconds",
-        "1.3", "Use spark.history.fs.update.interval.seconds instead"),
-      DeprecatedConfig("spark.history.updateInterval",
-        "spark.history.fs.update.interval.seconds",
-        "1.3", "Use spark.history.fs.update.interval.seconds instead"))
-    configs.map { x => (x.oldName, x) }.toMap
+      DeprecatedConfig("spark.yarn.user.classpath.first", "1.3",
+        "Please use spark.{driver,executor}.userClassPathFirst instead."))
+    Map(configs.map { cfg => (cfg.key -> cfg) }:_*)
+  }
+
+  /**
+   * Maps a current config key to alternate keys that were used in previous version of Spark.
+   *
+   * The alternates are used in the order defined in this map. If deprecated configs are
+   * present in the user's configuration, a warning is logged.
+   */
+  private val configsWithAlternatives = Map[String, Seq[AlternateConfig]](
+    "spark.executor.userClassPathFirst" -> Seq(
+      AlternateConfig("spark.files.userClassPathFirst", "1.3")),
+    "spark.history.fs.update.interval" -> Seq(
+      AlternateConfig("spark.history.fs.update.interval.seconds", "1.4"),
+      AlternateConfig("spark.history.fs.updateInterval", "1.3"),
+      AlternateConfig("spark.history.updateInterval", "1.3"))
+    )
+
+  /**
+   * A view of `configsWithAlternatives` that makes it more efficient to look up deprecated
+   * config keys.
+   *
+   * Maps the deprecated config name to a 2-tuple (new config name, alternate config info).
+   */
+  private val allAlternatives: Map[String, (String, AlternateConfig)] = {
+    configsWithAlternatives.keys.flatMap { key =>
+      configsWithAlternatives(key).map { cfg => (cfg.key -> (key -> cfg)) }
+    }.toMap
   }
 
   /**
@@ -443,61 +463,57 @@ private[spark] object SparkConf extends Logging {
   }
 
   /**
-   * Translate the configuration key if it is deprecated and has a replacement, otherwise just
-   * returns the provided key.
-   *
-   * @param userKey Configuration key from the user / caller.
-   * @param warn Whether to print a warning if the key is deprecated. Warnings will be printed
-   *             only once for each key.
+   * Looks for available deprecated keys for the given config option, and return the first
+   * value available.
    */
-  private def translateConfKey(userKey: String, warn: Boolean = false): String = {
-    deprecatedConfigs.get(userKey)
-      .map { deprecatedKey =>
-        if (warn) {
-          deprecatedKey.warn()
-        }
-        deprecatedKey.newName.getOrElse(userKey)
-      }.getOrElse(userKey)
+  def getDeprecatedConfig(key: String, conf: SparkConf): Option[String] = {
+    configsWithAlternatives.get(key).flatMap { alts =>
+      alts.collectFirst { case alt if conf.contains(alt.key) =>
+        val value = conf.get(alt.key)
+        alt.translation.map(_(value)).getOrElse(value)
+      }
+    }
   }
 
   /**
-   * Holds information about keys that have been deprecated or renamed.
+   * Logs a warning message if the given config key is deprecated.
+   */
+  def logDeprecationWarning(key: String): Unit = {
+    deprecatedConfigs.get(key).foreach { cfg =>
+      logWarning(
+        s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +
+        s"may be removed in the future. ${cfg.deprecationMessage}")
+    }
+
+    allAlternatives.get(key).foreach { case (newKey, cfg) =>
+      logWarning(
+        s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +
+        s"and may be removed in the future. Please use the new key '$newKey' instead.")
+    }
+  }
+
+  /**
+   * Holds information about keys that have been deprecated and do not have a replacement.
    *
-   * @param oldName Old configuration key.
-   * @param newName New configuration key, or `null` if key has no replacement, in which case the
-   *                deprecated key will be used (but the warning message will still be printed).
+   * @param key The deprecated key.
    * @param version Version of Spark where key was deprecated.
-   * @param deprecationMessage Message to include in the deprecation warning; mandatory when
-   *                           `newName` is not provided.
+   * @param deprecationMessage Message to include in the deprecation warning.
    */
   private case class DeprecatedConfig(
-      oldName: String,
-      _newName: String,
+      key: String,
       version: String,
-      deprecationMessage: String = null) {
-
-    private val warned = new AtomicBoolean(false)
-    val newName = Option(_newName)
+      deprecationMessage: String)
 
-    if (newName == null && (deprecationMessage == null || deprecationMessage.isEmpty())) {
-      throw new IllegalArgumentException("Need new config name or deprecation message.")
-    }
-
-    def warn(): Unit = {
-      if (warned.compareAndSet(false, true)) {
-        if (newName != null) {
-          val message = Option(deprecationMessage).getOrElse(
-            s"Please use the alternative '$newName' instead.")
-          logWarning(
-            s"The configuration option '$oldName' has been replaced as of Spark $version and " +
-            s"may be removed in the future. $message")
-        } else {
-          logWarning(
-            s"The configuration option '$oldName' has been deprecated as of Spark $version and " +
-            s"may be removed in the future. $deprecationMessage")
-        }
-      }
-    }
+  /**
+   * Information about an alternate configuration key that has been deprecated.
+   *
+   * @param key The deprecated config key.
+   * @param version The Spark version in which the key was deprecated.
+   * @param translation A translation function for converting old config values into new ones.
+   */
+  private case class AlternateConfig(
+      key: String,
+      version: String,
+      translation: Option[String => String] = None)
 
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4527761b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 9d40d8c..9855457 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -49,11 +49,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
   private val NOT_STARTED = "<Not Started>"
 
   // Interval between each check for event log updates
-  private val UPDATE_INTERVAL_MS = conf.getOption("spark.history.fs.update.interval.seconds")
-    .orElse(conf.getOption("spark.history.fs.updateInterval"))
-    .orElse(conf.getOption("spark.history.updateInterval"))
-    .map(_.toInt)
-    .getOrElse(10) * 1000
+  private val UPDATE_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.update.interval", "10s")
 
   // Interval between each cleaner checks for event logs to delete
   private val CLEAN_INTERVAL_MS = conf.getLong("spark.history.fs.cleaner.interval.seconds",
@@ -130,8 +126,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
     // Disable the background thread during tests.
     if (!conf.contains("spark.testing")) {
       // A task that periodically checks for event log updates on disk.
-      pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_MS,
-        TimeUnit.MILLISECONDS)
+      pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS)
 
       if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
         // A task that periodically cleans event logs on disk.

http://git-wip-us.apache.org/repos/asf/spark/blob/4527761b/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 1b5fdeb..327d155 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -89,10 +89,7 @@ private[spark] class Executor(
     ExecutorEndpoint.EXECUTOR_ENDPOINT_NAME, new ExecutorEndpoint(env.rpcEnv, executorId))
 
   // Whether to load classes in user jars before those in Spark jars
-  private val userClassPathFirst: Boolean = {
-    conf.getBoolean("spark.executor.userClassPathFirst",
-      conf.getBoolean("spark.files.userClassPathFirst", false))
-  }
+  private val userClassPathFirst = conf.getBoolean("spark.executor.userClassPathFirst", false)
 
   // Create our ClassLoader
   // do this after SparkEnv creation so can access the SecurityManager

http://git-wip-us.apache.org/repos/asf/spark/blob/4527761b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index e08210a..7d87ba5 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -197,6 +197,28 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro
     serializer.newInstance().serialize(new StringBuffer())
   }
 
+  test("deprecated configs") {
+    val conf = new SparkConf()
+    val newName = "spark.history.fs.update.interval"
+
+    assert(!conf.contains(newName))
+
+    conf.set("spark.history.updateInterval", "1")
+    assert(conf.get(newName) === "1")
+
+    conf.set("spark.history.fs.updateInterval", "2")
+    assert(conf.get(newName) === "2")
+
+    conf.set("spark.history.fs.update.interval.seconds", "3")
+    assert(conf.get(newName) === "3")
+
+    conf.set(newName, "4")
+    assert(conf.get(newName) === "4")
+
+    val count = conf.getAll.filter { case (k, v) => k.startsWith("spark.history.") }.size
+    assert(count === 4)
+  }
+
 }
 
 class Class1 {}

http://git-wip-us.apache.org/repos/asf/spark/blob/4527761b/docs/monitoring.md
----------------------------------------------------------------------
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 6816671..2a13022 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -86,10 +86,10 @@ follows:
     </td>
   </tr>
   <tr>
-    <td>spark.history.fs.update.interval.seconds</td>
-    <td>10</td>
+    <td>spark.history.fs.update.interval</td>
+    <td>10s</td>
     <td>
-      The period, in seconds, at which information displayed by this history server is updated.
+      The period at which information displayed by this history server is updated.
       Each update checks for any changes made to the event logs in persisted storage.
     </td>
   </tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/4527761b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 1091ff5..52e4dee 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1052,8 +1052,7 @@ object Client extends Logging {
     if (isDriver) {
       conf.getBoolean("spark.driver.userClassPathFirst", false)
     } else {
-      conf.getBoolean("spark.executor.userClassPathFirst",
-        conf.getBoolean("spark.files.userClassPathFirst", false))
+      conf.getBoolean("spark.executor.userClassPathFirst", false)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org