You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2018/12/30 01:33:56 UTC

[spark] branch master updated: [SPARK-26443][CORE] Use ConfigEntry for hardcoded configs for history category.

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e6d3e7d  [SPARK-26443][CORE] Use ConfigEntry for hardcoded configs for history category.
e6d3e7d is described below

commit e6d3e7d0d8c80adaa51b43d76f1cc83bb9a010b9
Author: Takuya UESHIN <ue...@databricks.com>
AuthorDate: Sat Dec 29 17:33:43 2018 -0800

    [SPARK-26443][CORE] Use ConfigEntry for hardcoded configs for history category.
    
    ## What changes were proposed in this pull request?
    
    This pr makes hardcoded "spark.history" configs to use `ConfigEntry` and put them in `History` config object.
    
    ## How was this patch tested?
    
    Existing tests.
    
    Closes #23384 from ueshin/issues/SPARK-26443/hardcoded_history_configs.
    
    Authored-by: Takuya UESHIN <ue...@databricks.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../main/scala/org/apache/spark/SparkConf.scala    |  4 +-
 .../spark/deploy/history/FsHistoryProvider.scala   | 21 +++++-----
 .../spark/deploy/history/HistoryServer.scala       | 16 ++++----
 .../deploy/history/HistoryServerArguments.scala    |  2 +-
 .../org/apache/spark/internal/config/History.scala | 46 +++++++++++++++++++++-
 .../scala/org/apache/spark/SparkConfSuite.scala    |  2 +-
 .../deploy/history/FsHistoryProviderSuite.scala    | 23 ++++++-----
 .../history/HistoryServerArgumentsSuite.scala      |  9 +++--
 .../spark/deploy/history/HistoryServerSuite.scala  | 13 +++---
 9 files changed, 89 insertions(+), 47 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 8d135d3..0b47da1 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -682,11 +682,11 @@ private[spark] object SparkConf extends Logging {
   private val configsWithAlternatives = Map[String, Seq[AlternateConfig]](
     "spark.executor.userClassPathFirst" -> Seq(
       AlternateConfig("spark.files.userClassPathFirst", "1.3")),
-    "spark.history.fs.update.interval" -> Seq(
+    UPDATE_INTERVAL_S.key -> Seq(
       AlternateConfig("spark.history.fs.update.interval.seconds", "1.4"),
       AlternateConfig("spark.history.fs.updateInterval", "1.3"),
       AlternateConfig("spark.history.updateInterval", "1.3")),
-    "spark.history.fs.cleaner.interval" -> Seq(
+    CLEANER_INTERVAL_S.key -> Seq(
       AlternateConfig("spark.history.fs.cleaner.interval.seconds", "1.4")),
     MAX_LOG_AGE_S.key -> Seq(
       AlternateConfig("spark.history.fs.cleaner.maxAge.seconds", "1.4")),
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index da6e5f0..709a380 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -42,7 +42,7 @@ import org.fusesource.leveldbjni.internal.NativeDB
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.DRIVER_LOG_DFS_DIR
+import org.apache.spark.internal.config.{DRIVER_LOG_DFS_DIR, History}
 import org.apache.spark.internal.config.History._
 import org.apache.spark.internal.config.Status._
 import org.apache.spark.io.CompressionCodec
@@ -91,24 +91,22 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
   import FsHistoryProvider._
 
   // Interval between safemode checks.
-  private val SAFEMODE_CHECK_INTERVAL_S = conf.getTimeAsSeconds(
-    "spark.history.fs.safemodeCheck.interval", "5s")
+  private val SAFEMODE_CHECK_INTERVAL_S = conf.get(History.SAFEMODE_CHECK_INTERVAL_S)
 
   // Interval between each check for event log updates
-  private val UPDATE_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.update.interval", "10s")
+  private val UPDATE_INTERVAL_S = conf.get(History.UPDATE_INTERVAL_S)
 
   // Interval between each cleaner checks for event logs to delete
-  private val CLEAN_INTERVAL_S = conf.get(CLEANER_INTERVAL_S)
+  private val CLEAN_INTERVAL_S = conf.get(History.CLEANER_INTERVAL_S)
 
   // Number of threads used to replay event logs.
-  private val NUM_PROCESSING_THREADS = conf.getInt(SPARK_HISTORY_FS_NUM_REPLAY_THREADS,
-    Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)
+  private val NUM_PROCESSING_THREADS = conf.get(History.NUM_REPLAY_THREADS)
 
-  private val logDir = conf.get(EVENT_LOG_DIR)
+  private val logDir = conf.get(History.HISTORY_LOG_DIR)
 
-  private val HISTORY_UI_ACLS_ENABLE = conf.getBoolean("spark.history.ui.acls.enable", false)
-  private val HISTORY_UI_ADMIN_ACLS = conf.get("spark.history.ui.admin.acls", "")
-  private val HISTORY_UI_ADMIN_ACLS_GROUPS = conf.get("spark.history.ui.admin.acls.groups", "")
+  private val HISTORY_UI_ACLS_ENABLE = conf.get(History.UI_ACLS_ENABLE)
+  private val HISTORY_UI_ADMIN_ACLS = conf.get(History.UI_ADMIN_ACLS)
+  private val HISTORY_UI_ADMIN_ACLS_GROUPS = conf.get(History.UI_ADMIN_ACLS_GROUPS)
   logInfo(s"History server ui acls " + (if (HISTORY_UI_ACLS_ENABLE) "enabled" else "disabled") +
     "; users with admin permissions: " + HISTORY_UI_ADMIN_ACLS.toString +
     "; groups with admin permissions" + HISTORY_UI_ADMIN_ACLS_GROUPS.toString)
@@ -1089,7 +1087,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
 }
 
 private[history] object FsHistoryProvider {
-  private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = "spark.history.fs.numReplayThreads"
 
   private val APPL_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationStart\""
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 5856c70..b930338 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -30,7 +30,7 @@ import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
-import org.apache.spark.internal.config.History.HISTORY_SERVER_UI_PORT
+import org.apache.spark.internal.config.History
 import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, UIRoot}
 import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
 import org.apache.spark.ui.JettyUtils._
@@ -56,7 +56,7 @@ class HistoryServer(
   with Logging with UIRoot with ApplicationCacheOperations {
 
   // How many applications to retain
-  private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50)
+  private val retainedApplications = conf.get(History.RETAINED_APPLICATIONS)
 
   // How many applications the summary ui displays
   private[history] val maxApplications = conf.get(HISTORY_UI_MAX_APPS);
@@ -273,14 +273,14 @@ object HistoryServer extends Logging {
     initSecurity()
     val securityManager = createSecurityManager(conf)
 
-    val providerName = conf.getOption("spark.history.provider")
+    val providerName = conf.get(History.PROVIDER)
       .getOrElse(classOf[FsHistoryProvider].getName())
     val provider = Utils.classForName(providerName)
       .getConstructor(classOf[SparkConf])
       .newInstance(conf)
       .asInstanceOf[ApplicationHistoryProvider]
 
-    val port = conf.get(HISTORY_SERVER_UI_PORT)
+    val port = conf.get(History.HISTORY_SERVER_UI_PORT)
 
     val server = new HistoryServer(conf, provider, securityManager, port)
     server.bind()
@@ -319,10 +319,12 @@ object HistoryServer extends Logging {
     // from a keytab file so that we can access HDFS beyond the kerberos ticket expiration.
     // As long as it is using Hadoop rpc (hdfs://), a relogin will automatically
     // occur from the keytab.
-    if (conf.getBoolean("spark.history.kerberos.enabled", false)) {
+    if (conf.get(History.KERBEROS_ENABLED)) {
       // if you have enabled kerberos the following 2 params must be set
-      val principalName = conf.get("spark.history.kerberos.principal")
-      val keytabFilename = conf.get("spark.history.kerberos.keytab")
+      val principalName = conf.get(History.KERBEROS_PRINCIPAL)
+        .getOrElse(throw new NoSuchElementException(History.KERBEROS_PRINCIPAL.key))
+      val keytabFilename = conf.get(History.KERBEROS_KEYTAB)
+        .getOrElse(throw new NoSuchElementException(History.KERBEROS_KEYTAB.key))
       SparkHadoopUtil.get.loginUserFromKeytab(principalName, keytabFilename)
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
index 49f00cb..dec8976 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
@@ -79,7 +79,7 @@ private[history] class HistoryServerArguments(conf: SparkConf, args: Array[Strin
       |
       |  spark.history.fs.logDirectory      Directory where app logs are stored
       |                                     (default: file:/tmp/spark-events)
-      |  spark.history.fs.updateInterval    How often to reload log data from storage
+      |  spark.history.fs.update.interval   How often to reload log data from storage
       |                                     (in seconds, default: 10)
       |""".stripMargin)
     // scalastyle:on println
diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala
index b7d8061..f984dd3 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/History.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala
@@ -25,10 +25,18 @@ private[spark] object History {
 
   val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
 
-  val EVENT_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory")
+  val HISTORY_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory")
     .stringConf
     .createWithDefault(DEFAULT_LOG_DIR)
 
+  val SAFEMODE_CHECK_INTERVAL_S = ConfigBuilder("spark.history.fs.safemodeCheck.interval")
+    .timeConf(TimeUnit.SECONDS)
+    .createWithDefaultString("5s")
+
+  val UPDATE_INTERVAL_S = ConfigBuilder("spark.history.fs.update.interval")
+    .timeConf(TimeUnit.SECONDS)
+    .createWithDefaultString("10s")
+
   val CLEANER_ENABLED = ConfigBuilder("spark.history.fs.cleaner.enabled")
     .booleanConf
     .createWithDefault(false)
@@ -79,4 +87,40 @@ private[spark] object History {
 
   val MAX_DRIVER_LOG_AGE_S = ConfigBuilder("spark.history.fs.driverlog.cleaner.maxAge")
     .fallbackConf(MAX_LOG_AGE_S)
+
+  val UI_ACLS_ENABLE = ConfigBuilder("spark.history.ui.acls.enable")
+    .booleanConf
+    .createWithDefault(false)
+
+  val UI_ADMIN_ACLS = ConfigBuilder("spark.history.ui.admin.acls")
+    .stringConf
+    .createWithDefault("")
+
+  val UI_ADMIN_ACLS_GROUPS = ConfigBuilder("spark.history.ui.admin.acls.groups")
+    .stringConf
+    .createWithDefault("")
+
+  val NUM_REPLAY_THREADS = ConfigBuilder("spark.history.fs.numReplayThreads")
+    .intConf
+    .createWithDefaultFunction(() => Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)
+
+  val RETAINED_APPLICATIONS = ConfigBuilder("spark.history.retainedApplications")
+    .intConf
+    .createWithDefault(50)
+
+  val PROVIDER = ConfigBuilder("spark.history.provider")
+    .stringConf
+    .createOptional
+
+  val KERBEROS_ENABLED = ConfigBuilder("spark.history.kerberos.enabled")
+    .booleanConf
+    .createWithDefault(false)
+
+  val KERBEROS_PRINCIPAL = ConfigBuilder("spark.history.kerberos.principal")
+    .stringConf
+    .createOptional
+
+  val KERBEROS_KEYTAB = ConfigBuilder("spark.history.kerberos.keytab")
+    .stringConf
+    .createOptional
 }
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 7cb03de..e14a5dc 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -232,7 +232,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
 
   test("deprecated configs") {
     val conf = new SparkConf()
-    val newName = "spark.history.fs.update.interval"
+    val newName = UPDATE_INTERVAL_S.key
 
     assert(!conf.contains(newName))
 
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index c1ae27a..6d2e329 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -294,7 +294,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
     val maxAge = TimeUnit.SECONDS.toMillis(10)
     val clock = new ManualClock(maxAge / 2)
     val provider = new FsHistoryProvider(
-      createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock)
+      createTestConf().set(MAX_LOG_AGE_S.key, s"${maxAge}ms"), clock)
 
     val log1 = newLogFile("app1", Some("attempt1"), inProgress = false)
     writeFile(log1, true, None,
@@ -379,7 +379,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
     val maxAge = TimeUnit.SECONDS.toMillis(40)
     val clock = new ManualClock(0)
     val provider = new FsHistoryProvider(
-      createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock)
+      createTestConf().set(MAX_LOG_AGE_S.key, s"${maxAge}ms"), clock)
 
     val log1 = newLogFile("inProgressApp1", None, inProgress = true)
     writeFile(log1, true, None,
@@ -462,8 +462,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
     val maxAge = TimeUnit.SECONDS.toSeconds(40)
     val clock = new ManualClock(0)
     val testConf = new SparkConf()
-    testConf.set("spark.history.fs.logDirectory",
-      Utils.createTempDir(namePrefix = "eventLog").getAbsolutePath())
+    testConf.set(HISTORY_LOG_DIR, Utils.createTempDir(namePrefix = "eventLog").getAbsolutePath())
     testConf.set(DRIVER_LOG_DFS_DIR, testDir.getAbsolutePath())
     testConf.set(DRIVER_LOG_CLEANER_ENABLED, true)
     testConf.set(DRIVER_LOG_CLEANER_INTERVAL, maxAge / 4)
@@ -645,9 +644,9 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
 
     // Test both history ui admin acls and application acls are configured.
     val conf1 = createTestConf()
-      .set("spark.history.ui.acls.enable", "true")
-      .set("spark.history.ui.admin.acls", "user1,user2")
-      .set("spark.history.ui.admin.acls.groups", "group1")
+      .set(UI_ACLS_ENABLE, true)
+      .set(UI_ADMIN_ACLS, "user1,user2")
+      .set(UI_ADMIN_ACLS_GROUPS, "group1")
       .set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName)
 
     createAndCheck(conf1, ("spark.admin.acls", "user"), ("spark.admin.acls.groups", "group")) {
@@ -667,9 +666,9 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
 
     // Test only history ui admin acls are configured.
     val conf2 = createTestConf()
-      .set("spark.history.ui.acls.enable", "true")
-      .set("spark.history.ui.admin.acls", "user1,user2")
-      .set("spark.history.ui.admin.acls.groups", "group1")
+      .set(UI_ACLS_ENABLE, true)
+      .set(UI_ADMIN_ACLS, "user1,user2")
+      .set(UI_ADMIN_ACLS_GROUPS, "group1")
       .set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName)
     createAndCheck(conf2) { securityManager =>
       // Test whether user has permission to access UI.
@@ -687,7 +686,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
 
     // Test neither history ui admin acls nor application acls are configured.
      val conf3 = createTestConf()
-      .set("spark.history.ui.acls.enable", "true")
+      .set(UI_ACLS_ENABLE, true)
       .set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName)
     createAndCheck(conf3) { securityManager =>
       // Test whether user has permission to access UI.
@@ -1036,7 +1035,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
 
   private def createTestConf(inMemory: Boolean = false): SparkConf = {
     val conf = new SparkConf()
-      .set(EVENT_LOG_DIR, testDir.getAbsolutePath())
+      .set(HISTORY_LOG_DIR, testDir.getAbsolutePath())
       .set(FAST_IN_PROGRESS_PARSING, true)
 
     if (!inMemory) {
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
index e89733a..6b47987 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
@@ -22,21 +22,22 @@ import java.nio.charset.StandardCharsets._
 import com.google.common.io.Files
 
 import org.apache.spark._
+import org.apache.spark.internal.config.History._
 import org.apache.spark.util.Utils
 
 class HistoryServerArgumentsSuite extends SparkFunSuite {
 
   private val logDir = new File("src/test/resources/spark-events")
   private val conf = new SparkConf()
-    .set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
-    .set("spark.history.fs.updateInterval", "1")
+    .set(HISTORY_LOG_DIR, logDir.getAbsolutePath)
+    .set(UPDATE_INTERVAL_S, 1L)
     .set("spark.testing", "true")
 
   test("No Arguments Parsing") {
     val argStrings = Array.empty[String]
     val hsa = new HistoryServerArguments(conf, argStrings)
-    assert(conf.get("spark.history.fs.logDirectory") === logDir.getAbsolutePath)
-    assert(conf.get("spark.history.fs.updateInterval") === "1")
+    assert(conf.get(HISTORY_LOG_DIR) === logDir.getAbsolutePath)
+    assert(conf.get(UPDATE_INTERVAL_S) === 1L)
     assert(conf.get("spark.testing") === "true")
   }
 
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 2a2d013..a9dee67 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -78,8 +78,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
     Utils.deleteRecursively(storeDir)
     assert(storeDir.mkdir())
     val conf = new SparkConf()
-      .set("spark.history.fs.logDirectory", logDir)
-      .set("spark.history.fs.update.interval", "0")
+      .set(HISTORY_LOG_DIR, logDir)
+      .set(UPDATE_INTERVAL_S.key, "0")
       .set("spark.testing", "true")
       .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
       .set("spark.eventLog.logStageExecutorMetrics.enabled", "true")
@@ -416,11 +416,10 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
     // allowed refresh rate (1Hz)
     stop()
     val myConf = new SparkConf()
-      .set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
+      .set(HISTORY_LOG_DIR, logDir.getAbsolutePath)
       .set("spark.eventLog.dir", logDir.getAbsolutePath)
-      .set("spark.history.fs.update.interval", "1s")
+      .set(UPDATE_INTERVAL_S.key, "1s")
       .set("spark.eventLog.enabled", "true")
-      .set("spark.history.cache.window", "250ms")
       .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
       .remove("spark.testing")
     val provider = new FsHistoryProvider(myConf)
@@ -613,8 +612,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
     stop()
     init(
       "spark.ui.filters" -> classOf[FakeAuthFilter].getName(),
-      "spark.history.ui.acls.enable" -> "true",
-      "spark.history.ui.admin.acls" -> admin)
+      UI_ACLS_ENABLE.key -> "true",
+      UI_ADMIN_ACLS.key -> admin)
 
     val tests = Seq(
       (owner, HttpServletResponse.SC_OK),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org