You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2018/12/31 21:35:19 UTC
[spark] branch master updated: [SPARK-26470][CORE] Use ConfigEntry
for hardcoded configs for eventLog category
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b1a9b5e [SPARK-26470][CORE] Use ConfigEntry for hardcoded configs for eventLog category
b1a9b5e is described below
commit b1a9b5eff59f64c370cd7388761effdf2152a108
Author: Marco Gaido <ma...@gmail.com>
AuthorDate: Mon Dec 31 13:35:02 2018 -0800
[SPARK-26470][CORE] Use ConfigEntry for hardcoded configs for eventLog category
## What changes were proposed in this pull request?
The PR makes hardcoded `spark.eventLog` configs to use `ConfigEntry` and put them in the `config` package.
## How was this patch tested?
existing tests
Closes #23395 from mgaido91/SPARK-26470.
Authored-by: Marco Gaido <ma...@gmail.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
core/src/main/scala/org/apache/spark/SparkContext.scala | 7 +++----
.../scala/org/apache/spark/internal/config/package.scala | 9 +++++++++
.../apache/spark/deploy/history/HistoryServerSuite.scala | 9 +++++----
.../spark/scheduler/EventLoggingListenerSuite.scala | 15 ++++++++-------
4 files changed, 25 insertions(+), 15 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 09cc346..3475859 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -230,7 +230,7 @@ class SparkContext(config: SparkConf) extends Logging {
def deployMode: String = _conf.getOption("spark.submit.deployMode").getOrElse("client")
def appName: String = _conf.get("spark.app.name")
- private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false)
+ private[spark] def isEventLogEnabled: Boolean = _conf.get(EVENT_LOG_ENABLED)
private[spark] def eventLogDir: Option[URI] = _eventLogDir
private[spark] def eventLogCodec: Option[String] = _eventLogCodec
@@ -396,15 +396,14 @@ class SparkContext(config: SparkConf) extends Logging {
_eventLogDir =
if (isEventLogEnabled) {
- val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
- .stripSuffix("/")
+ val unresolvedDir = conf.get(EVENT_LOG_DIR).stripSuffix("/")
Some(Utils.resolveURI(unresolvedDir))
} else {
None
}
_eventLogCodec = {
- val compress = _conf.getBoolean("spark.eventLog.compress", false)
+ val compress = _conf.get(EVENT_LOG_COMPRESS)
if (compress && isEventLogEnabled) {
Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
} else {
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index f1c1c03..d8e9c09 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.scheduler.EventLoggingListener
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.util.Utils
@@ -62,6 +63,14 @@ package object config {
.booleanConf
.createWithDefault(false)
+ private[spark] val EVENT_LOG_ENABLED = ConfigBuilder("spark.eventLog.enabled")
+ .booleanConf
+ .createWithDefault(false)
+
+ private[spark] val EVENT_LOG_DIR = ConfigBuilder("spark.eventLog.dir")
+ .stringConf
+ .createWithDefault(EventLoggingListener.DEFAULT_LOG_DIR)
+
private[spark] val EVENT_LOG_COMPRESS =
ConfigBuilder("spark.eventLog.compress")
.booleanConf
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index a9dee67..96458c5 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -45,6 +45,7 @@ import org.scalatest.mockito.MockitoSugar
import org.scalatest.selenium.WebBrowser
import org.apache.spark._
+import org.apache.spark.internal.config._
import org.apache.spark.internal.config.History._
import org.apache.spark.status.api.v1.ApplicationInfo
import org.apache.spark.status.api.v1.JobData
@@ -82,8 +83,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
.set(UPDATE_INTERVAL_S.key, "0")
.set("spark.testing", "true")
.set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
- .set("spark.eventLog.logStageExecutorMetrics.enabled", "true")
- .set("spark.eventLog.logStageExecutorProcessTreeMetrics.enabled", "true")
+ .set(EVENT_LOG_STAGE_EXECUTOR_METRICS, true)
+ .set(EVENT_LOG_PROCESS_TREE_METRICS, true)
conf.setAll(extraConf)
provider = new FsHistoryProvider(conf)
provider.checkForLogs()
@@ -417,9 +418,9 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
stop()
val myConf = new SparkConf()
.set(HISTORY_LOG_DIR, logDir.getAbsolutePath)
- .set("spark.eventLog.dir", logDir.getAbsolutePath)
+ .set(EVENT_LOG_DIR, logDir.getAbsolutePath)
.set(UPDATE_INTERVAL_S.key, "1s")
- .set("spark.eventLog.enabled", "true")
+ .set(EVENT_LOG_ENABLED, true)
.set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
.remove("spark.testing")
val provider = new FsHistoryProvider(myConf)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 0c04a93..04987e6 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -33,6 +33,7 @@ import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
import org.apache.spark.io._
import org.apache.spark.metrics.{ExecutorMetricType, MetricsSystem}
import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -122,7 +123,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
// Expected IOException, since we haven't enabled log overwrite.
intercept[IOException] { testEventLogging() }
// Try again, but enable overwriting.
- testEventLogging(extraConf = Map("spark.eventLog.overwrite" -> "true"))
+ testEventLogging(extraConf = Map(EVENT_LOG_OVERWRITE.key -> "true"))
}
test("Event log name") {
@@ -526,15 +527,15 @@ object EventLoggingListenerSuite {
/** Get a SparkConf with event logging enabled. */
def getLoggingConf(logDir: Path, compressionCodec: Option[String] = None): SparkConf = {
val conf = new SparkConf
- conf.set("spark.eventLog.enabled", "true")
- conf.set("spark.eventLog.logBlockUpdates.enabled", "true")
- conf.set("spark.eventLog.testing", "true")
- conf.set("spark.eventLog.dir", logDir.toString)
+ conf.set(EVENT_LOG_ENABLED, true)
+ conf.set(EVENT_LOG_BLOCK_UPDATES, true)
+ conf.set(EVENT_LOG_TESTING, true)
+ conf.set(EVENT_LOG_DIR, logDir.toString)
compressionCodec.foreach { codec =>
- conf.set("spark.eventLog.compress", "true")
+ conf.set(EVENT_LOG_COMPRESS, true)
conf.set("spark.io.compression.codec", codec)
}
- conf.set("spark.eventLog.logStageExecutorMetrics.enabled", "true")
+ conf.set(EVENT_LOG_STAGE_EXECUTOR_METRICS, true)
conf
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org