You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/10/17 21:30:57 UTC
spark git commit: [SPARK-22050][CORE] Allow BlockUpdated events to be
optionally logged to the event log
Repository: spark
Updated Branches:
refs/heads/master 28f9f3f22 -> 1437e344e
[SPARK-22050][CORE] Allow BlockUpdated events to be optionally logged to the event log
## What changes were proposed in this pull request?
I see that block updates are not logged to the event log.
This makes sense as a default for performance reasons.
However, I find it helpful when trying to get a better understanding of caching for a job to be able to log these updates.
This PR adds a configuration setting `spark.eventLog.blockUpdates` (defaulting to false) which allows block updates to be recorded in the log.
This contribution is original work which is licensed to the Apache Spark project.
## How was this patch tested?
Current and additional unit tests.
Author: Michael Mior <mm...@uwaterloo.ca>
Closes #19263 from michaelmior/log-block-updates.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1437e344
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1437e344
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1437e344
Branch: refs/heads/master
Commit: 1437e344ec0c29a44a19f4513986f5f184c44695
Parents: 28f9f3f
Author: Michael Mior <mm...@uwaterloo.ca>
Authored: Tue Oct 17 14:30:52 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Tue Oct 17 14:30:52 2017 -0700
----------------------------------------------------------------------
.../apache/spark/internal/config/package.scala | 23 +++++++++++++
.../spark/scheduler/EventLoggingListener.scala | 18 +++++++----
.../org/apache/spark/util/JsonProtocol.scala | 34 ++++++++++++++++++--
.../scheduler/EventLoggingListenerSuite.scala | 2 ++
.../apache/spark/util/JsonProtocolSuite.scala | 27 ++++++++++++++++
docs/configuration.md | 8 +++++
6 files changed, 104 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/1437e344/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index e7b406a..0c36bdc 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -41,6 +41,29 @@ package object config {
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("1g")
+ private[spark] val EVENT_LOG_COMPRESS =
+ ConfigBuilder("spark.eventLog.compress")
+ .booleanConf
+ .createWithDefault(false)
+
+ private[spark] val EVENT_LOG_BLOCK_UPDATES =
+ ConfigBuilder("spark.eventLog.logBlockUpdates.enabled")
+ .booleanConf
+ .createWithDefault(false)
+
+ private[spark] val EVENT_LOG_TESTING =
+ ConfigBuilder("spark.eventLog.testing")
+ .internal()
+ .booleanConf
+ .createWithDefault(false)
+
+ private[spark] val EVENT_LOG_OUTPUT_BUFFER_SIZE = ConfigBuilder("spark.eventLog.buffer.kb")
+ .bytesConf(ByteUnit.KiB)
+ .createWithDefaultString("100k")
+
+ private[spark] val EVENT_LOG_OVERWRITE =
+ ConfigBuilder("spark.eventLog.overwrite").booleanConf.createWithDefault(false)
+
private[spark] val EXECUTOR_CLASS_PATH =
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional
http://git-wip-us.apache.org/repos/asf/spark/blob/1437e344/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 9dafa0b..a77adc5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -37,6 +37,7 @@ import org.json4s.jackson.JsonMethods._
import org.apache.spark.{SPARK_VERSION, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.{JsonProtocol, Utils}
@@ -45,6 +46,7 @@ import org.apache.spark.util.{JsonProtocol, Utils}
*
* Event logging is specified by the following configurable parameters:
* spark.eventLog.enabled - Whether event logging is enabled.
+ * spark.eventLog.logBlockUpdates.enabled - Whether to log block updates
* spark.eventLog.compress - Whether to compress logged events
* spark.eventLog.overwrite - Whether to overwrite any existing files.
* spark.eventLog.dir - Path to the directory in which events are logged.
@@ -64,10 +66,11 @@ private[spark] class EventLoggingListener(
this(appId, appAttemptId, logBaseDir, sparkConf,
SparkHadoopUtil.get.newConfiguration(sparkConf))
- private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
- private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
- private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
- private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
+ private val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS)
+ private val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE)
+ private val shouldLogBlockUpdates = sparkConf.get(EVENT_LOG_BLOCK_UPDATES)
+ private val testing = sparkConf.get(EVENT_LOG_TESTING)
+ private val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt
private val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
private val compressionCodec =
if (shouldCompress) {
@@ -216,8 +219,11 @@ private[spark] class EventLoggingListener(
logEvent(event, flushLogger = true)
}
- // No-op because logging every update would be overkill
- override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {}
+ override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
+ if (shouldLogBlockUpdates) {
+ logEvent(event, flushLogger = true)
+ }
+ }
// No-op because logging every update would be overkill
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { }
http://git-wip-us.apache.org/repos/asf/spark/blob/1437e344/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 8406826..5e60218 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -98,8 +98,8 @@ private[spark] object JsonProtocol {
logStartToJson(logStart)
case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
executorMetricsUpdateToJson(metricsUpdate)
- case blockUpdated: SparkListenerBlockUpdated =>
- throw new MatchError(blockUpdated) // TODO(ekl) implement this
+ case blockUpdate: SparkListenerBlockUpdated =>
+ blockUpdateToJson(blockUpdate)
case _ => parse(mapper.writeValueAsString(event))
}
}
@@ -246,6 +246,12 @@ private[spark] object JsonProtocol {
})
}
+ def blockUpdateToJson(blockUpdate: SparkListenerBlockUpdated): JValue = {
+ val blockUpdatedInfo = blockUpdatedInfoToJson(blockUpdate.blockUpdatedInfo)
+ ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.blockUpdate) ~
+ ("Block Updated Info" -> blockUpdatedInfo)
+ }
+
/** ------------------------------------------------------------------- *
* JSON serialization methods for classes SparkListenerEvents depend on |
* -------------------------------------------------------------------- */
@@ -458,6 +464,14 @@ private[spark] object JsonProtocol {
("Log Urls" -> mapToJson(executorInfo.logUrlMap))
}
+ def blockUpdatedInfoToJson(blockUpdatedInfo: BlockUpdatedInfo): JValue = {
+ ("Block Manager ID" -> blockManagerIdToJson(blockUpdatedInfo.blockManagerId)) ~
+ ("Block ID" -> blockUpdatedInfo.blockId.toString) ~
+ ("Storage Level" -> storageLevelToJson(blockUpdatedInfo.storageLevel)) ~
+ ("Memory Size" -> blockUpdatedInfo.memSize) ~
+ ("Disk Size" -> blockUpdatedInfo.diskSize)
+ }
+
/** ------------------------------ *
* Util JSON serialization methods |
* ------------------------------- */
@@ -515,6 +529,7 @@ private[spark] object JsonProtocol {
val executorRemoved = Utils.getFormattedClassName(SparkListenerExecutorRemoved)
val logStart = Utils.getFormattedClassName(SparkListenerLogStart)
val metricsUpdate = Utils.getFormattedClassName(SparkListenerExecutorMetricsUpdate)
+ val blockUpdate = Utils.getFormattedClassName(SparkListenerBlockUpdated)
}
def sparkEventFromJson(json: JValue): SparkListenerEvent = {
@@ -538,6 +553,7 @@ private[spark] object JsonProtocol {
case `executorRemoved` => executorRemovedFromJson(json)
case `logStart` => logStartFromJson(json)
case `metricsUpdate` => executorMetricsUpdateFromJson(json)
+ case `blockUpdate` => blockUpdateFromJson(json)
case other => mapper.readValue(compact(render(json)), Utils.classForName(other))
.asInstanceOf[SparkListenerEvent]
}
@@ -676,6 +692,11 @@ private[spark] object JsonProtocol {
SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates)
}
+ def blockUpdateFromJson(json: JValue): SparkListenerBlockUpdated = {
+ val blockUpdatedInfo = blockUpdatedInfoFromJson(json \ "Block Updated Info")
+ SparkListenerBlockUpdated(blockUpdatedInfo)
+ }
+
/** --------------------------------------------------------------------- *
* JSON deserialization methods for classes SparkListenerEvents depend on |
* ---------------------------------------------------------------------- */
@@ -989,6 +1010,15 @@ private[spark] object JsonProtocol {
new ExecutorInfo(executorHost, totalCores, logUrls)
}
+ def blockUpdatedInfoFromJson(json: JValue): BlockUpdatedInfo = {
+ val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
+ val blockId = BlockId((json \ "Block ID").extract[String])
+ val storageLevel = storageLevelFromJson(json \ "Storage Level")
+ val memorySize = (json \ "Memory Size").extract[Long]
+ val diskSize = (json \ "Disk Size").extract[Long]
+ BlockUpdatedInfo(blockManagerId, blockId, storageLevel, memorySize, diskSize)
+ }
+
/** -------------------------------- *
* Util JSON deserialization methods |
* --------------------------------- */
http://git-wip-us.apache.org/repos/asf/spark/blob/1437e344/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 6b42775..a9e92fa 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -228,6 +228,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
SparkListenerStageCompleted,
SparkListenerTaskStart,
SparkListenerTaskEnd,
+ SparkListenerBlockUpdated,
SparkListenerApplicationEnd).map(Utils.getFormattedClassName)
Utils.tryWithSafeFinally {
val logStart = SparkListenerLogStart(SPARK_VERSION)
@@ -291,6 +292,7 @@ object EventLoggingListenerSuite {
def getLoggingConf(logDir: Path, compressionCodec: Option[String] = None): SparkConf = {
val conf = new SparkConf
conf.set("spark.eventLog.enabled", "true")
+ conf.set("spark.eventLog.logBlockUpdates.enabled", "true")
conf.set("spark.eventLog.testing", "true")
conf.set("spark.eventLog.dir", logDir.toString)
compressionCodec.foreach { codec =>
http://git-wip-us.apache.org/repos/asf/spark/blob/1437e344/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index a1a8587..4abbb8e 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -96,6 +96,9 @@ class JsonProtocolSuite extends SparkFunSuite {
.zipWithIndex.map { case (a, i) => a.copy(id = i) }
SparkListenerExecutorMetricsUpdate("exec3", Seq((1L, 2, 3, accumUpdates)))
}
+ val blockUpdated =
+ SparkListenerBlockUpdated(BlockUpdatedInfo(BlockManagerId("Stars",
+ "In your multitude...", 300), RDDBlockId(0, 0), StorageLevel.MEMORY_ONLY, 100L, 0L))
testEvent(stageSubmitted, stageSubmittedJsonString)
testEvent(stageCompleted, stageCompletedJsonString)
@@ -120,6 +123,7 @@ class JsonProtocolSuite extends SparkFunSuite {
testEvent(nodeBlacklisted, nodeBlacklistedJsonString)
testEvent(nodeUnblacklisted, nodeUnblacklistedJsonString)
testEvent(executorMetricsUpdate, executorMetricsUpdateJsonString)
+ testEvent(blockUpdated, blockUpdatedJsonString)
}
test("Dependent Classes") {
@@ -2007,6 +2011,29 @@ private[spark] object JsonProtocolSuite extends Assertions {
|}
""".stripMargin
+ private val blockUpdatedJsonString =
+ """
+ |{
+ | "Event": "SparkListenerBlockUpdated",
+ | "Block Updated Info": {
+ | "Block Manager ID": {
+ | "Executor ID": "Stars",
+ | "Host": "In your multitude...",
+ | "Port": 300
+ | },
+ | "Block ID": "rdd_0_0",
+ | "Storage Level": {
+ | "Use Disk": false,
+ | "Use Memory": true,
+ | "Deserialized": true,
+ | "Replication": 1
+ | },
+ | "Memory Size": 100,
+ | "Disk Size": 0
+ | }
+ |}
+ """.stripMargin
+
private val executorBlacklistedJsonString =
s"""
|{
http://git-wip-us.apache.org/repos/asf/spark/blob/1437e344/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index bb06c8f..7b9e16a 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -715,6 +715,14 @@ Apart from these, the following properties are also available, and may be useful
<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr>
+ <td><code>spark.eventLog.logBlockUpdates.enabled</code></td>
+ <td>false</td>
+ <td>
+ Whether to log events for every block update, if <code>spark.eventLog.enabled</code> is true.
+ *Warning*: This will increase the size of the event log considerably.
+ </td>
+</tr>
+<tr>
<td><code>spark.eventLog.compress</code></td>
<td>false</td>
<td>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org