You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2018/12/30 01:33:56 UTC
[spark] branch master updated: [SPARK-26443][CORE] Use ConfigEntry
for hardcoded configs for history category.
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e6d3e7d [SPARK-26443][CORE] Use ConfigEntry for hardcoded configs for history category.
e6d3e7d is described below
commit e6d3e7d0d8c80adaa51b43d76f1cc83bb9a010b9
Author: Takuya UESHIN <ue...@databricks.com>
AuthorDate: Sat Dec 29 17:33:43 2018 -0800
[SPARK-26443][CORE] Use ConfigEntry for hardcoded configs for history category.
## What changes were proposed in this pull request?
This pr makes hardcoded "spark.history" configs to use `ConfigEntry` and put them in `History` config object.
## How was this patch tested?
Existing tests.
Closes #23384 from ueshin/issues/SPARK-26443/hardcoded_history_configs.
Authored-by: Takuya UESHIN <ue...@databricks.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../main/scala/org/apache/spark/SparkConf.scala | 4 +-
.../spark/deploy/history/FsHistoryProvider.scala | 21 +++++-----
.../spark/deploy/history/HistoryServer.scala | 16 ++++----
.../deploy/history/HistoryServerArguments.scala | 2 +-
.../org/apache/spark/internal/config/History.scala | 46 +++++++++++++++++++++-
.../scala/org/apache/spark/SparkConfSuite.scala | 2 +-
.../deploy/history/FsHistoryProviderSuite.scala | 23 ++++++-----
.../history/HistoryServerArgumentsSuite.scala | 9 +++--
.../spark/deploy/history/HistoryServerSuite.scala | 13 +++---
9 files changed, 89 insertions(+), 47 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 8d135d3..0b47da1 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -682,11 +682,11 @@ private[spark] object SparkConf extends Logging {
private val configsWithAlternatives = Map[String, Seq[AlternateConfig]](
"spark.executor.userClassPathFirst" -> Seq(
AlternateConfig("spark.files.userClassPathFirst", "1.3")),
- "spark.history.fs.update.interval" -> Seq(
+ UPDATE_INTERVAL_S.key -> Seq(
AlternateConfig("spark.history.fs.update.interval.seconds", "1.4"),
AlternateConfig("spark.history.fs.updateInterval", "1.3"),
AlternateConfig("spark.history.updateInterval", "1.3")),
- "spark.history.fs.cleaner.interval" -> Seq(
+ CLEANER_INTERVAL_S.key -> Seq(
AlternateConfig("spark.history.fs.cleaner.interval.seconds", "1.4")),
MAX_LOG_AGE_S.key -> Seq(
AlternateConfig("spark.history.fs.cleaner.maxAge.seconds", "1.4")),
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index da6e5f0..709a380 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -42,7 +42,7 @@ import org.fusesource.leveldbjni.internal.NativeDB
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.DRIVER_LOG_DFS_DIR
+import org.apache.spark.internal.config.{DRIVER_LOG_DFS_DIR, History}
import org.apache.spark.internal.config.History._
import org.apache.spark.internal.config.Status._
import org.apache.spark.io.CompressionCodec
@@ -91,24 +91,22 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
import FsHistoryProvider._
// Interval between safemode checks.
- private val SAFEMODE_CHECK_INTERVAL_S = conf.getTimeAsSeconds(
- "spark.history.fs.safemodeCheck.interval", "5s")
+ private val SAFEMODE_CHECK_INTERVAL_S = conf.get(History.SAFEMODE_CHECK_INTERVAL_S)
// Interval between each check for event log updates
- private val UPDATE_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.update.interval", "10s")
+ private val UPDATE_INTERVAL_S = conf.get(History.UPDATE_INTERVAL_S)
// Interval between each cleaner checks for event logs to delete
- private val CLEAN_INTERVAL_S = conf.get(CLEANER_INTERVAL_S)
+ private val CLEAN_INTERVAL_S = conf.get(History.CLEANER_INTERVAL_S)
// Number of threads used to replay event logs.
- private val NUM_PROCESSING_THREADS = conf.getInt(SPARK_HISTORY_FS_NUM_REPLAY_THREADS,
- Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)
+ private val NUM_PROCESSING_THREADS = conf.get(History.NUM_REPLAY_THREADS)
- private val logDir = conf.get(EVENT_LOG_DIR)
+ private val logDir = conf.get(History.HISTORY_LOG_DIR)
- private val HISTORY_UI_ACLS_ENABLE = conf.getBoolean("spark.history.ui.acls.enable", false)
- private val HISTORY_UI_ADMIN_ACLS = conf.get("spark.history.ui.admin.acls", "")
- private val HISTORY_UI_ADMIN_ACLS_GROUPS = conf.get("spark.history.ui.admin.acls.groups", "")
+ private val HISTORY_UI_ACLS_ENABLE = conf.get(History.UI_ACLS_ENABLE)
+ private val HISTORY_UI_ADMIN_ACLS = conf.get(History.UI_ADMIN_ACLS)
+ private val HISTORY_UI_ADMIN_ACLS_GROUPS = conf.get(History.UI_ADMIN_ACLS_GROUPS)
logInfo(s"History server ui acls " + (if (HISTORY_UI_ACLS_ENABLE) "enabled" else "disabled") +
"; users with admin permissions: " + HISTORY_UI_ADMIN_ACLS.toString +
"; groups with admin permissions" + HISTORY_UI_ADMIN_ACLS_GROUPS.toString)
@@ -1089,7 +1087,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
private[history] object FsHistoryProvider {
- private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = "spark.history.fs.numReplayThreads"
private val APPL_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationStart\""
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 5856c70..b930338 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -30,7 +30,7 @@ import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
-import org.apache.spark.internal.config.History.HISTORY_SERVER_UI_PORT
+import org.apache.spark.internal.config.History
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, UIRoot}
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
@@ -56,7 +56,7 @@ class HistoryServer(
with Logging with UIRoot with ApplicationCacheOperations {
// How many applications to retain
- private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50)
+ private val retainedApplications = conf.get(History.RETAINED_APPLICATIONS)
// How many applications the summary ui displays
private[history] val maxApplications = conf.get(HISTORY_UI_MAX_APPS);
@@ -273,14 +273,14 @@ object HistoryServer extends Logging {
initSecurity()
val securityManager = createSecurityManager(conf)
- val providerName = conf.getOption("spark.history.provider")
+ val providerName = conf.get(History.PROVIDER)
.getOrElse(classOf[FsHistoryProvider].getName())
val provider = Utils.classForName(providerName)
.getConstructor(classOf[SparkConf])
.newInstance(conf)
.asInstanceOf[ApplicationHistoryProvider]
- val port = conf.get(HISTORY_SERVER_UI_PORT)
+ val port = conf.get(History.HISTORY_SERVER_UI_PORT)
val server = new HistoryServer(conf, provider, securityManager, port)
server.bind()
@@ -319,10 +319,12 @@ object HistoryServer extends Logging {
// from a keytab file so that we can access HDFS beyond the kerberos ticket expiration.
// As long as it is using Hadoop rpc (hdfs://), a relogin will automatically
// occur from the keytab.
- if (conf.getBoolean("spark.history.kerberos.enabled", false)) {
+ if (conf.get(History.KERBEROS_ENABLED)) {
// if you have enabled kerberos the following 2 params must be set
- val principalName = conf.get("spark.history.kerberos.principal")
- val keytabFilename = conf.get("spark.history.kerberos.keytab")
+ val principalName = conf.get(History.KERBEROS_PRINCIPAL)
+ .getOrElse(throw new NoSuchElementException(History.KERBEROS_PRINCIPAL.key))
+ val keytabFilename = conf.get(History.KERBEROS_KEYTAB)
+ .getOrElse(throw new NoSuchElementException(History.KERBEROS_KEYTAB.key))
SparkHadoopUtil.get.loginUserFromKeytab(principalName, keytabFilename)
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
index 49f00cb..dec8976 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
@@ -79,7 +79,7 @@ private[history] class HistoryServerArguments(conf: SparkConf, args: Array[Strin
|
| spark.history.fs.logDirectory Directory where app logs are stored
| (default: file:/tmp/spark-events)
- | spark.history.fs.updateInterval How often to reload log data from storage
+ | spark.history.fs.update.interval How often to reload log data from storage
| (in seconds, default: 10)
|""".stripMargin)
// scalastyle:on println
diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala
index b7d8061..f984dd3 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/History.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala
@@ -25,10 +25,18 @@ private[spark] object History {
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
- val EVENT_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory")
+ val HISTORY_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory")
.stringConf
.createWithDefault(DEFAULT_LOG_DIR)
+ val SAFEMODE_CHECK_INTERVAL_S = ConfigBuilder("spark.history.fs.safemodeCheck.interval")
+ .timeConf(TimeUnit.SECONDS)
+ .createWithDefaultString("5s")
+
+ val UPDATE_INTERVAL_S = ConfigBuilder("spark.history.fs.update.interval")
+ .timeConf(TimeUnit.SECONDS)
+ .createWithDefaultString("10s")
+
val CLEANER_ENABLED = ConfigBuilder("spark.history.fs.cleaner.enabled")
.booleanConf
.createWithDefault(false)
@@ -79,4 +87,40 @@ private[spark] object History {
val MAX_DRIVER_LOG_AGE_S = ConfigBuilder("spark.history.fs.driverlog.cleaner.maxAge")
.fallbackConf(MAX_LOG_AGE_S)
+
+ val UI_ACLS_ENABLE = ConfigBuilder("spark.history.ui.acls.enable")
+ .booleanConf
+ .createWithDefault(false)
+
+ val UI_ADMIN_ACLS = ConfigBuilder("spark.history.ui.admin.acls")
+ .stringConf
+ .createWithDefault("")
+
+ val UI_ADMIN_ACLS_GROUPS = ConfigBuilder("spark.history.ui.admin.acls.groups")
+ .stringConf
+ .createWithDefault("")
+
+ val NUM_REPLAY_THREADS = ConfigBuilder("spark.history.fs.numReplayThreads")
+ .intConf
+ .createWithDefaultFunction(() => Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)
+
+ val RETAINED_APPLICATIONS = ConfigBuilder("spark.history.retainedApplications")
+ .intConf
+ .createWithDefault(50)
+
+ val PROVIDER = ConfigBuilder("spark.history.provider")
+ .stringConf
+ .createOptional
+
+ val KERBEROS_ENABLED = ConfigBuilder("spark.history.kerberos.enabled")
+ .booleanConf
+ .createWithDefault(false)
+
+ val KERBEROS_PRINCIPAL = ConfigBuilder("spark.history.kerberos.principal")
+ .stringConf
+ .createOptional
+
+ val KERBEROS_KEYTAB = ConfigBuilder("spark.history.kerberos.keytab")
+ .stringConf
+ .createOptional
}
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 7cb03de..e14a5dc 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -232,7 +232,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
test("deprecated configs") {
val conf = new SparkConf()
- val newName = "spark.history.fs.update.interval"
+ val newName = UPDATE_INTERVAL_S.key
assert(!conf.contains(newName))
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index c1ae27a..6d2e329 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -294,7 +294,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val maxAge = TimeUnit.SECONDS.toMillis(10)
val clock = new ManualClock(maxAge / 2)
val provider = new FsHistoryProvider(
- createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock)
+ createTestConf().set(MAX_LOG_AGE_S.key, s"${maxAge}ms"), clock)
val log1 = newLogFile("app1", Some("attempt1"), inProgress = false)
writeFile(log1, true, None,
@@ -379,7 +379,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val maxAge = TimeUnit.SECONDS.toMillis(40)
val clock = new ManualClock(0)
val provider = new FsHistoryProvider(
- createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock)
+ createTestConf().set(MAX_LOG_AGE_S.key, s"${maxAge}ms"), clock)
val log1 = newLogFile("inProgressApp1", None, inProgress = true)
writeFile(log1, true, None,
@@ -462,8 +462,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val maxAge = TimeUnit.SECONDS.toSeconds(40)
val clock = new ManualClock(0)
val testConf = new SparkConf()
- testConf.set("spark.history.fs.logDirectory",
- Utils.createTempDir(namePrefix = "eventLog").getAbsolutePath())
+ testConf.set(HISTORY_LOG_DIR, Utils.createTempDir(namePrefix = "eventLog").getAbsolutePath())
testConf.set(DRIVER_LOG_DFS_DIR, testDir.getAbsolutePath())
testConf.set(DRIVER_LOG_CLEANER_ENABLED, true)
testConf.set(DRIVER_LOG_CLEANER_INTERVAL, maxAge / 4)
@@ -645,9 +644,9 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
// Test both history ui admin acls and application acls are configured.
val conf1 = createTestConf()
- .set("spark.history.ui.acls.enable", "true")
- .set("spark.history.ui.admin.acls", "user1,user2")
- .set("spark.history.ui.admin.acls.groups", "group1")
+ .set(UI_ACLS_ENABLE, true)
+ .set(UI_ADMIN_ACLS, "user1,user2")
+ .set(UI_ADMIN_ACLS_GROUPS, "group1")
.set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName)
createAndCheck(conf1, ("spark.admin.acls", "user"), ("spark.admin.acls.groups", "group")) {
@@ -667,9 +666,9 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
// Test only history ui admin acls are configured.
val conf2 = createTestConf()
- .set("spark.history.ui.acls.enable", "true")
- .set("spark.history.ui.admin.acls", "user1,user2")
- .set("spark.history.ui.admin.acls.groups", "group1")
+ .set(UI_ACLS_ENABLE, true)
+ .set(UI_ADMIN_ACLS, "user1,user2")
+ .set(UI_ADMIN_ACLS_GROUPS, "group1")
.set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName)
createAndCheck(conf2) { securityManager =>
// Test whether user has permission to access UI.
@@ -687,7 +686,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
// Test neither history ui admin acls nor application acls are configured.
val conf3 = createTestConf()
- .set("spark.history.ui.acls.enable", "true")
+ .set(UI_ACLS_ENABLE, true)
.set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName)
createAndCheck(conf3) { securityManager =>
// Test whether user has permission to access UI.
@@ -1036,7 +1035,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
private def createTestConf(inMemory: Boolean = false): SparkConf = {
val conf = new SparkConf()
- .set(EVENT_LOG_DIR, testDir.getAbsolutePath())
+ .set(HISTORY_LOG_DIR, testDir.getAbsolutePath())
.set(FAST_IN_PROGRESS_PARSING, true)
if (!inMemory) {
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
index e89733a..6b47987 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
@@ -22,21 +22,22 @@ import java.nio.charset.StandardCharsets._
import com.google.common.io.Files
import org.apache.spark._
+import org.apache.spark.internal.config.History._
import org.apache.spark.util.Utils
class HistoryServerArgumentsSuite extends SparkFunSuite {
private val logDir = new File("src/test/resources/spark-events")
private val conf = new SparkConf()
- .set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
- .set("spark.history.fs.updateInterval", "1")
+ .set(HISTORY_LOG_DIR, logDir.getAbsolutePath)
+ .set(UPDATE_INTERVAL_S, 1L)
.set("spark.testing", "true")
test("No Arguments Parsing") {
val argStrings = Array.empty[String]
val hsa = new HistoryServerArguments(conf, argStrings)
- assert(conf.get("spark.history.fs.logDirectory") === logDir.getAbsolutePath)
- assert(conf.get("spark.history.fs.updateInterval") === "1")
+ assert(conf.get(HISTORY_LOG_DIR) === logDir.getAbsolutePath)
+ assert(conf.get(UPDATE_INTERVAL_S) === 1L)
assert(conf.get("spark.testing") === "true")
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 2a2d013..a9dee67 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -78,8 +78,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
Utils.deleteRecursively(storeDir)
assert(storeDir.mkdir())
val conf = new SparkConf()
- .set("spark.history.fs.logDirectory", logDir)
- .set("spark.history.fs.update.interval", "0")
+ .set(HISTORY_LOG_DIR, logDir)
+ .set(UPDATE_INTERVAL_S.key, "0")
.set("spark.testing", "true")
.set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
.set("spark.eventLog.logStageExecutorMetrics.enabled", "true")
@@ -416,11 +416,10 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
// allowed refresh rate (1Hz)
stop()
val myConf = new SparkConf()
- .set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
+ .set(HISTORY_LOG_DIR, logDir.getAbsolutePath)
.set("spark.eventLog.dir", logDir.getAbsolutePath)
- .set("spark.history.fs.update.interval", "1s")
+ .set(UPDATE_INTERVAL_S.key, "1s")
.set("spark.eventLog.enabled", "true")
- .set("spark.history.cache.window", "250ms")
.set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
.remove("spark.testing")
val provider = new FsHistoryProvider(myConf)
@@ -613,8 +612,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
stop()
init(
"spark.ui.filters" -> classOf[FakeAuthFilter].getName(),
- "spark.history.ui.acls.enable" -> "true",
- "spark.history.ui.admin.acls" -> admin)
+ UI_ACLS_ENABLE.key -> "true",
+ UI_ADMIN_ACLS.key -> admin)
val tests = Seq(
(owner, HttpServletResponse.SC_OK),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org