You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/04/18 04:02:18 UTC

spark git commit: [SPARK-5933] [core] Move config deprecation warnings to SparkConf.

Repository: spark
Updated Branches:
  refs/heads/master 6fbeb82e1 -> 199133733


[SPARK-5933] [core] Move config deprecation warnings to SparkConf.

I didn't find many deprecated configs after a grep-based search,
but the ones I could find were moved to the centralized location
in SparkConf.

While there, I deprecated a couple more HS configs that mentioned
time units.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #5562 from vanzin/SPARK-5933 and squashes the following commits:

dcb617e7 [Marcelo Vanzin] [SPARK-5933] [core] Move config deprecation warnings to SparkConf.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/19913373
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/19913373
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/19913373

Branch: refs/heads/master
Commit: 1991337336596f94698e79c2366f065c374128ab
Parents: 6fbeb82
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Fri Apr 17 19:02:07 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Fri Apr 17 19:02:07 2015 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkConf.scala    | 17 ++++++++++++++---
 .../src/main/scala/org/apache/spark/SparkEnv.scala | 10 ++--------
 .../spark/deploy/history/FsHistoryProvider.scala   | 15 +++------------
 .../scala/org/apache/spark/SparkConfSuite.scala    |  3 +++
 docs/monitoring.md                                 | 15 +++++++--------
 .../spark/deploy/yarn/ApplicationMaster.scala      |  9 +--------
 6 files changed, 30 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/19913373/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index b0186e9..e3a649d 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -403,6 +403,9 @@ private[spark] object SparkConf extends Logging {
    */
   private val deprecatedConfigs: Map[String, DeprecatedConfig] = {
     val configs = Seq(
+      DeprecatedConfig("spark.cache.class", "0.8",
+        "The spark.cache.class property is no longer being used! Specify storage levels using " +
+        "the RDD.persist() method instead."),
       DeprecatedConfig("spark.yarn.user.classpath.first", "1.3",
         "Please use spark.{driver,executor}.userClassPathFirst instead."))
     Map(configs.map { cfg => (cfg.key -> cfg) }:_*)
@@ -420,7 +423,15 @@ private[spark] object SparkConf extends Logging {
     "spark.history.fs.update.interval" -> Seq(
       AlternateConfig("spark.history.fs.update.interval.seconds", "1.4"),
       AlternateConfig("spark.history.fs.updateInterval", "1.3"),
-      AlternateConfig("spark.history.updateInterval", "1.3"))
+      AlternateConfig("spark.history.updateInterval", "1.3")),
+    "spark.history.fs.cleaner.interval" -> Seq(
+      AlternateConfig("spark.history.fs.cleaner.interval.seconds", "1.4")),
+    "spark.history.fs.cleaner.maxAge" -> Seq(
+      AlternateConfig("spark.history.fs.cleaner.maxAge.seconds", "1.4")),
+    "spark.yarn.am.waitTime" -> Seq(
+      AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3",
+        // Translate old value to a duration, with 10s wait time per try.
+        translation = s => s"${s.toLong * 10}s"))
     )
 
   /**
@@ -470,7 +481,7 @@ private[spark] object SparkConf extends Logging {
     configsWithAlternatives.get(key).flatMap { alts =>
       alts.collectFirst { case alt if conf.contains(alt.key) =>
         val value = conf.get(alt.key)
-        alt.translation.map(_(value)).getOrElse(value)
+        if (alt.translation != null) alt.translation(value) else value
       }
     }
   }
@@ -514,6 +525,6 @@ private[spark] object SparkConf extends Logging {
   private case class AlternateConfig(
       key: String,
       version: String,
-      translation: Option[String => String] = None)
+      translation: String => String = null)
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/19913373/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 0171488..959aefa 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -103,7 +103,7 @@ class SparkEnv (
     // actorSystem.awaitTermination()
 
     // Note that blockTransferService is stopped by BlockManager since it is started by it.
-    
+
     // If we only stop sc, but the driver process still run as a services then we need to delete
     // the tmp dir, if not, it will create too many tmp dirs.
     // We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
@@ -375,12 +375,6 @@ object SparkEnv extends Logging {
       "."
     }
 
-    // Warn about deprecated spark.cache.class property
-    if (conf.contains("spark.cache.class")) {
-      logWarning("The spark.cache.class property is no longer being used! Specify storage " +
-        "levels using the RDD.persist() method instead.")
-    }
-
     val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
       new OutputCommitCoordinator(conf)
     }
@@ -406,7 +400,7 @@ object SparkEnv extends Logging {
       shuffleMemoryManager,
       outputCommitCoordinator,
       conf)
-      
+
     // Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
     // called, and we only need to do it for driver. Because driver may run as a service, and if we
     // don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.

http://git-wip-us.apache.org/repos/asf/spark/blob/19913373/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 9855457..47bdd77 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -52,8 +52,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
   private val UPDATE_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.update.interval", "10s")
 
   // Interval between each cleaner checks for event logs to delete
-  private val CLEAN_INTERVAL_MS = conf.getLong("spark.history.fs.cleaner.interval.seconds",
-    DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S) * 1000
+  private val CLEAN_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.cleaner.interval", "1d")
 
   private val logDir = conf.getOption("spark.history.fs.logDirectory")
     .map { d => Utils.resolveURI(d).toString }
@@ -130,8 +129,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
 
       if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
         // A task that periodically cleans event logs on disk.
-        pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_MS,
-          TimeUnit.MILLISECONDS)
+        pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS)
       }
     }
   }
@@ -270,8 +268,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
     try {
       val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
         .getOrElse(Seq[FileStatus]())
-      val maxAge = conf.getLong("spark.history.fs.cleaner.maxAge.seconds",
-        DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000
+      val maxAge = conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000
 
       val now = System.currentTimeMillis()
       val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
@@ -417,12 +414,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
 
 private object FsHistoryProvider {
   val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
-
-  // One day
-  val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = Duration(1, TimeUnit.DAYS).toSeconds
-
-  // One week
-  val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = Duration(7, TimeUnit.DAYS).toSeconds
 }
 
 private class FsApplicationHistoryInfo(

http://git-wip-us.apache.org/repos/asf/spark/blob/19913373/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 7d87ba5..8e6c200 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -217,6 +217,9 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro
 
     val count = conf.getAll.filter { case (k, v) => k.startsWith("spark.history.") }.size
     assert(count === 4)
+
+    conf.set("spark.yarn.applicationMaster.waitTries", "42")
+    assert(conf.getTimeAsSeconds("spark.yarn.am.waitTime") === 420)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/19913373/docs/monitoring.md
----------------------------------------------------------------------
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 2a13022..8a85928 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -153,19 +153,18 @@ follows:
     </td>
   </tr>
   <tr>
-    <td>spark.history.fs.cleaner.interval.seconds</td>
-    <td>86400</td>
+    <td>spark.history.fs.cleaner.interval</td>
+    <td>1d</td>
     <td>
-      How often the job history cleaner checks for files to delete, in seconds. Defaults to 86400 (one day).
-      Files are only deleted if they are older than spark.history.fs.cleaner.maxAge.seconds.
+      How often the job history cleaner checks for files to delete.
+      Files are only deleted if they are older than spark.history.fs.cleaner.maxAge.
     </td>
   </tr>
   <tr>
-    <td>spark.history.fs.cleaner.maxAge.seconds</td>
-    <td>3600 * 24 * 7</td>
+    <td>spark.history.fs.cleaner.maxAge</td>
+    <td>7d</td>
     <td>
-      Job history files older than this many seconds will be deleted when the history cleaner runs.
-      Defaults to 3600 * 24 * 7 (1 week).
+      Job history files older than this will be deleted when the history cleaner runs.
     </td>
   </tr>
 </table>

http://git-wip-us.apache.org/repos/asf/spark/blob/19913373/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index c357b7a..f7a8420 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -373,14 +373,7 @@ private[spark] class ApplicationMaster(
   private def waitForSparkContextInitialized(): SparkContext = {
     logInfo("Waiting for spark context initialization")
     sparkContextRef.synchronized {
-      val waitTries = sparkConf.getOption("spark.yarn.applicationMaster.waitTries")
-        .map(_.toLong * 10000L)
-      if (waitTries.isDefined) {
-        logWarning(
-          "spark.yarn.applicationMaster.waitTries is deprecated, use spark.yarn.am.waitTime")
-      }
-      val totalWaitTime = sparkConf.getTimeAsMs("spark.yarn.am.waitTime", 
-        s"${waitTries.getOrElse(100000L)}ms")
+      val totalWaitTime = sparkConf.getTimeAsMs("spark.yarn.am.waitTime", "100s")
       val deadline = System.currentTimeMillis() + totalWaitTime
 
       while (sparkContextRef.get() == null && System.currentTimeMillis < deadline && !finished) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org