You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2018/12/30 01:37:28 UTC

[GitHub] asfgit closed pull request #23384: [SPARK-26443][CORE] Use ConfigEntry for hardcoded configs for history category.

asfgit closed pull request #23384: [SPARK-26443][CORE] Use ConfigEntry for hardcoded configs for history category.
URL: https://github.com/apache/spark/pull/23384
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 8d135d3e083d7..0b47da12b5b42 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -682,11 +682,11 @@ private[spark] object SparkConf extends Logging {
   private val configsWithAlternatives = Map[String, Seq[AlternateConfig]](
     "spark.executor.userClassPathFirst" -> Seq(
       AlternateConfig("spark.files.userClassPathFirst", "1.3")),
-    "spark.history.fs.update.interval" -> Seq(
+    UPDATE_INTERVAL_S.key -> Seq(
       AlternateConfig("spark.history.fs.update.interval.seconds", "1.4"),
       AlternateConfig("spark.history.fs.updateInterval", "1.3"),
       AlternateConfig("spark.history.updateInterval", "1.3")),
-    "spark.history.fs.cleaner.interval" -> Seq(
+    CLEANER_INTERVAL_S.key -> Seq(
       AlternateConfig("spark.history.fs.cleaner.interval.seconds", "1.4")),
     MAX_LOG_AGE_S.key -> Seq(
       AlternateConfig("spark.history.fs.cleaner.maxAge.seconds", "1.4")),
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index da6e5f03aabb5..709a380dfb636 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -42,7 +42,7 @@ import org.fusesource.leveldbjni.internal.NativeDB
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.DRIVER_LOG_DFS_DIR
+import org.apache.spark.internal.config.{DRIVER_LOG_DFS_DIR, History}
 import org.apache.spark.internal.config.History._
 import org.apache.spark.internal.config.Status._
 import org.apache.spark.io.CompressionCodec
@@ -91,24 +91,22 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
   import FsHistoryProvider._
 
   // Interval between safemode checks.
-  private val SAFEMODE_CHECK_INTERVAL_S = conf.getTimeAsSeconds(
-    "spark.history.fs.safemodeCheck.interval", "5s")
+  private val SAFEMODE_CHECK_INTERVAL_S = conf.get(History.SAFEMODE_CHECK_INTERVAL_S)
 
   // Interval between each check for event log updates
-  private val UPDATE_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.update.interval", "10s")
+  private val UPDATE_INTERVAL_S = conf.get(History.UPDATE_INTERVAL_S)
 
   // Interval between each cleaner checks for event logs to delete
-  private val CLEAN_INTERVAL_S = conf.get(CLEANER_INTERVAL_S)
+  private val CLEAN_INTERVAL_S = conf.get(History.CLEANER_INTERVAL_S)
 
   // Number of threads used to replay event logs.
-  private val NUM_PROCESSING_THREADS = conf.getInt(SPARK_HISTORY_FS_NUM_REPLAY_THREADS,
-    Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)
+  private val NUM_PROCESSING_THREADS = conf.get(History.NUM_REPLAY_THREADS)
 
-  private val logDir = conf.get(EVENT_LOG_DIR)
+  private val logDir = conf.get(History.HISTORY_LOG_DIR)
 
-  private val HISTORY_UI_ACLS_ENABLE = conf.getBoolean("spark.history.ui.acls.enable", false)
-  private val HISTORY_UI_ADMIN_ACLS = conf.get("spark.history.ui.admin.acls", "")
-  private val HISTORY_UI_ADMIN_ACLS_GROUPS = conf.get("spark.history.ui.admin.acls.groups", "")
+  private val HISTORY_UI_ACLS_ENABLE = conf.get(History.UI_ACLS_ENABLE)
+  private val HISTORY_UI_ADMIN_ACLS = conf.get(History.UI_ADMIN_ACLS)
+  private val HISTORY_UI_ADMIN_ACLS_GROUPS = conf.get(History.UI_ADMIN_ACLS_GROUPS)
   logInfo(s"History server ui acls " + (if (HISTORY_UI_ACLS_ENABLE) "enabled" else "disabled") +
     "; users with admin permissions: " + HISTORY_UI_ADMIN_ACLS.toString +
     "; groups with admin permissions" + HISTORY_UI_ADMIN_ACLS_GROUPS.toString)
@@ -1089,7 +1087,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
 }
 
 private[history] object FsHistoryProvider {
-  private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = "spark.history.fs.numReplayThreads"
 
   private val APPL_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationStart\""
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 5856c7057b745..b9303388638fd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -30,7 +30,7 @@ import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
-import org.apache.spark.internal.config.History.HISTORY_SERVER_UI_PORT
+import org.apache.spark.internal.config.History
 import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, UIRoot}
 import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
 import org.apache.spark.ui.JettyUtils._
@@ -56,7 +56,7 @@ class HistoryServer(
   with Logging with UIRoot with ApplicationCacheOperations {
 
   // How many applications to retain
-  private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50)
+  private val retainedApplications = conf.get(History.RETAINED_APPLICATIONS)
 
   // How many applications the summary ui displays
   private[history] val maxApplications = conf.get(HISTORY_UI_MAX_APPS);
@@ -273,14 +273,14 @@ object HistoryServer extends Logging {
     initSecurity()
     val securityManager = createSecurityManager(conf)
 
-    val providerName = conf.getOption("spark.history.provider")
+    val providerName = conf.get(History.PROVIDER)
       .getOrElse(classOf[FsHistoryProvider].getName())
     val provider = Utils.classForName(providerName)
       .getConstructor(classOf[SparkConf])
       .newInstance(conf)
       .asInstanceOf[ApplicationHistoryProvider]
 
-    val port = conf.get(HISTORY_SERVER_UI_PORT)
+    val port = conf.get(History.HISTORY_SERVER_UI_PORT)
 
     val server = new HistoryServer(conf, provider, securityManager, port)
     server.bind()
@@ -319,10 +319,12 @@ object HistoryServer extends Logging {
     // from a keytab file so that we can access HDFS beyond the kerberos ticket expiration.
     // As long as it is using Hadoop rpc (hdfs://), a relogin will automatically
     // occur from the keytab.
-    if (conf.getBoolean("spark.history.kerberos.enabled", false)) {
+    if (conf.get(History.KERBEROS_ENABLED)) {
       // if you have enabled kerberos the following 2 params must be set
-      val principalName = conf.get("spark.history.kerberos.principal")
-      val keytabFilename = conf.get("spark.history.kerberos.keytab")
+      val principalName = conf.get(History.KERBEROS_PRINCIPAL)
+        .getOrElse(throw new NoSuchElementException(History.KERBEROS_PRINCIPAL.key))
+      val keytabFilename = conf.get(History.KERBEROS_KEYTAB)
+        .getOrElse(throw new NoSuchElementException(History.KERBEROS_KEYTAB.key))
       SparkHadoopUtil.get.loginUserFromKeytab(principalName, keytabFilename)
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
index 49f00cb10179e..dec89769c030b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
@@ -79,7 +79,7 @@ private[history] class HistoryServerArguments(conf: SparkConf, args: Array[Strin
       |
       |  spark.history.fs.logDirectory      Directory where app logs are stored
       |                                     (default: file:/tmp/spark-events)
-      |  spark.history.fs.updateInterval    How often to reload log data from storage
+      |  spark.history.fs.update.interval   How often to reload log data from storage
       |                                     (in seconds, default: 10)
       |""".stripMargin)
     // scalastyle:on println
diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala
index b7d8061d26d21..f984dd385344b 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/History.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala
@@ -25,10 +25,18 @@ private[spark] object History {
 
   val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
 
-  val EVENT_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory")
+  val HISTORY_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory")
     .stringConf
     .createWithDefault(DEFAULT_LOG_DIR)
 
+  val SAFEMODE_CHECK_INTERVAL_S = ConfigBuilder("spark.history.fs.safemodeCheck.interval")
+    .timeConf(TimeUnit.SECONDS)
+    .createWithDefaultString("5s")
+
+  val UPDATE_INTERVAL_S = ConfigBuilder("spark.history.fs.update.interval")
+    .timeConf(TimeUnit.SECONDS)
+    .createWithDefaultString("10s")
+
   val CLEANER_ENABLED = ConfigBuilder("spark.history.fs.cleaner.enabled")
     .booleanConf
     .createWithDefault(false)
@@ -79,4 +87,40 @@ private[spark] object History {
 
   val MAX_DRIVER_LOG_AGE_S = ConfigBuilder("spark.history.fs.driverlog.cleaner.maxAge")
     .fallbackConf(MAX_LOG_AGE_S)
+
+  val UI_ACLS_ENABLE = ConfigBuilder("spark.history.ui.acls.enable")
+    .booleanConf
+    .createWithDefault(false)
+
+  val UI_ADMIN_ACLS = ConfigBuilder("spark.history.ui.admin.acls")
+    .stringConf
+    .createWithDefault("")
+
+  val UI_ADMIN_ACLS_GROUPS = ConfigBuilder("spark.history.ui.admin.acls.groups")
+    .stringConf
+    .createWithDefault("")
+
+  val NUM_REPLAY_THREADS = ConfigBuilder("spark.history.fs.numReplayThreads")
+    .intConf
+    .createWithDefaultFunction(() => Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)
+
+  val RETAINED_APPLICATIONS = ConfigBuilder("spark.history.retainedApplications")
+    .intConf
+    .createWithDefault(50)
+
+  val PROVIDER = ConfigBuilder("spark.history.provider")
+    .stringConf
+    .createOptional
+
+  val KERBEROS_ENABLED = ConfigBuilder("spark.history.kerberos.enabled")
+    .booleanConf
+    .createWithDefault(false)
+
+  val KERBEROS_PRINCIPAL = ConfigBuilder("spark.history.kerberos.principal")
+    .stringConf
+    .createOptional
+
+  val KERBEROS_KEYTAB = ConfigBuilder("spark.history.kerberos.keytab")
+    .stringConf
+    .createOptional
 }
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 7cb03deae1391..e14a5dcb5ef84 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -232,7 +232,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
 
   test("deprecated configs") {
     val conf = new SparkConf()
-    val newName = "spark.history.fs.update.interval"
+    val newName = UPDATE_INTERVAL_S.key
 
     assert(!conf.contains(newName))
 
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index c1ae27aa940f6..6d2e329094ae2 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -294,7 +294,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
     val maxAge = TimeUnit.SECONDS.toMillis(10)
     val clock = new ManualClock(maxAge / 2)
     val provider = new FsHistoryProvider(
-      createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock)
+      createTestConf().set(MAX_LOG_AGE_S.key, s"${maxAge}ms"), clock)
 
     val log1 = newLogFile("app1", Some("attempt1"), inProgress = false)
     writeFile(log1, true, None,
@@ -379,7 +379,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
     val maxAge = TimeUnit.SECONDS.toMillis(40)
     val clock = new ManualClock(0)
     val provider = new FsHistoryProvider(
-      createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock)
+      createTestConf().set(MAX_LOG_AGE_S.key, s"${maxAge}ms"), clock)
 
     val log1 = newLogFile("inProgressApp1", None, inProgress = true)
     writeFile(log1, true, None,
@@ -462,8 +462,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
     val maxAge = TimeUnit.SECONDS.toSeconds(40)
     val clock = new ManualClock(0)
     val testConf = new SparkConf()
-    testConf.set("spark.history.fs.logDirectory",
-      Utils.createTempDir(namePrefix = "eventLog").getAbsolutePath())
+    testConf.set(HISTORY_LOG_DIR, Utils.createTempDir(namePrefix = "eventLog").getAbsolutePath())
     testConf.set(DRIVER_LOG_DFS_DIR, testDir.getAbsolutePath())
     testConf.set(DRIVER_LOG_CLEANER_ENABLED, true)
     testConf.set(DRIVER_LOG_CLEANER_INTERVAL, maxAge / 4)
@@ -645,9 +644,9 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
 
     // Test both history ui admin acls and application acls are configured.
     val conf1 = createTestConf()
-      .set("spark.history.ui.acls.enable", "true")
-      .set("spark.history.ui.admin.acls", "user1,user2")
-      .set("spark.history.ui.admin.acls.groups", "group1")
+      .set(UI_ACLS_ENABLE, true)
+      .set(UI_ADMIN_ACLS, "user1,user2")
+      .set(UI_ADMIN_ACLS_GROUPS, "group1")
       .set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName)
 
     createAndCheck(conf1, ("spark.admin.acls", "user"), ("spark.admin.acls.groups", "group")) {
@@ -667,9 +666,9 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
 
     // Test only history ui admin acls are configured.
     val conf2 = createTestConf()
-      .set("spark.history.ui.acls.enable", "true")
-      .set("spark.history.ui.admin.acls", "user1,user2")
-      .set("spark.history.ui.admin.acls.groups", "group1")
+      .set(UI_ACLS_ENABLE, true)
+      .set(UI_ADMIN_ACLS, "user1,user2")
+      .set(UI_ADMIN_ACLS_GROUPS, "group1")
       .set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName)
     createAndCheck(conf2) { securityManager =>
       // Test whether user has permission to access UI.
@@ -687,7 +686,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
 
     // Test neither history ui admin acls nor application acls are configured.
      val conf3 = createTestConf()
-      .set("spark.history.ui.acls.enable", "true")
+      .set(UI_ACLS_ENABLE, true)
       .set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName)
     createAndCheck(conf3) { securityManager =>
       // Test whether user has permission to access UI.
@@ -1036,7 +1035,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
 
   private def createTestConf(inMemory: Boolean = false): SparkConf = {
     val conf = new SparkConf()
-      .set(EVENT_LOG_DIR, testDir.getAbsolutePath())
+      .set(HISTORY_LOG_DIR, testDir.getAbsolutePath())
       .set(FAST_IN_PROGRESS_PARSING, true)
 
     if (!inMemory) {
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
index e89733a144cfa..6b479873f69f2 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
@@ -22,21 +22,22 @@ import java.nio.charset.StandardCharsets._
 import com.google.common.io.Files
 
 import org.apache.spark._
+import org.apache.spark.internal.config.History._
 import org.apache.spark.util.Utils
 
 class HistoryServerArgumentsSuite extends SparkFunSuite {
 
   private val logDir = new File("src/test/resources/spark-events")
   private val conf = new SparkConf()
-    .set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
-    .set("spark.history.fs.updateInterval", "1")
+    .set(HISTORY_LOG_DIR, logDir.getAbsolutePath)
+    .set(UPDATE_INTERVAL_S, 1L)
     .set("spark.testing", "true")
 
   test("No Arguments Parsing") {
     val argStrings = Array.empty[String]
     val hsa = new HistoryServerArguments(conf, argStrings)
-    assert(conf.get("spark.history.fs.logDirectory") === logDir.getAbsolutePath)
-    assert(conf.get("spark.history.fs.updateInterval") === "1")
+    assert(conf.get(HISTORY_LOG_DIR) === logDir.getAbsolutePath)
+    assert(conf.get(UPDATE_INTERVAL_S) === 1L)
     assert(conf.get("spark.testing") === "true")
   }
 
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 2a2d013bacbda..a9dee67ae9383 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -78,8 +78,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
     Utils.deleteRecursively(storeDir)
     assert(storeDir.mkdir())
     val conf = new SparkConf()
-      .set("spark.history.fs.logDirectory", logDir)
-      .set("spark.history.fs.update.interval", "0")
+      .set(HISTORY_LOG_DIR, logDir)
+      .set(UPDATE_INTERVAL_S.key, "0")
       .set("spark.testing", "true")
       .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
       .set("spark.eventLog.logStageExecutorMetrics.enabled", "true")
@@ -416,11 +416,10 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
     // allowed refresh rate (1Hz)
     stop()
     val myConf = new SparkConf()
-      .set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
+      .set(HISTORY_LOG_DIR, logDir.getAbsolutePath)
       .set("spark.eventLog.dir", logDir.getAbsolutePath)
-      .set("spark.history.fs.update.interval", "1s")
+      .set(UPDATE_INTERVAL_S.key, "1s")
       .set("spark.eventLog.enabled", "true")
-      .set("spark.history.cache.window", "250ms")
       .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
       .remove("spark.testing")
     val provider = new FsHistoryProvider(myConf)
@@ -613,8 +612,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
     stop()
     init(
       "spark.ui.filters" -> classOf[FakeAuthFilter].getName(),
-      "spark.history.ui.acls.enable" -> "true",
-      "spark.history.ui.admin.acls" -> admin)
+      UI_ACLS_ENABLE.key -> "true",
+      UI_ADMIN_ACLS.key -> admin)
 
     val tests = Seq(
       (owner, HttpServletResponse.SC_OK),


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org