You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/10/17 21:30:57 UTC

spark git commit: [SPARK-22050][CORE] Allow BlockUpdated events to be optionally logged to the event log

Repository: spark
Updated Branches:
  refs/heads/master 28f9f3f22 -> 1437e344e


[SPARK-22050][CORE] Allow BlockUpdated events to be optionally logged to the event log

## What changes were proposed in this pull request?

I see that block updates are not logged to the event log.
This makes sense as a default for performance reasons.
However, I find it helpful when trying to get a better understanding of caching for a job to be able to log these updates.
This PR adds a configuration setting `spark.eventLog.blockUpdates` (defaulting to false) which allows block updates to be recorded in the log.
This contribution is original work which is licensed to the Apache Spark project.

## How was this patch tested?

Current and additional unit tests.

Author: Michael Mior <mm...@uwaterloo.ca>

Closes #19263 from michaelmior/log-block-updates.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1437e344
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1437e344
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1437e344

Branch: refs/heads/master
Commit: 1437e344ec0c29a44a19f4513986f5f184c44695
Parents: 28f9f3f
Author: Michael Mior <mm...@uwaterloo.ca>
Authored: Tue Oct 17 14:30:52 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Tue Oct 17 14:30:52 2017 -0700

----------------------------------------------------------------------
 .../apache/spark/internal/config/package.scala  | 23 +++++++++++++
 .../spark/scheduler/EventLoggingListener.scala  | 18 +++++++----
 .../org/apache/spark/util/JsonProtocol.scala    | 34 ++++++++++++++++++--
 .../scheduler/EventLoggingListenerSuite.scala   |  2 ++
 .../apache/spark/util/JsonProtocolSuite.scala   | 27 ++++++++++++++++
 docs/configuration.md                           |  8 +++++
 6 files changed, 104 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1437e344/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index e7b406a..0c36bdc 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -41,6 +41,29 @@ package object config {
     .bytesConf(ByteUnit.MiB)
     .createWithDefaultString("1g")
 
+  private[spark] val EVENT_LOG_COMPRESS =
+    ConfigBuilder("spark.eventLog.compress")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[spark] val EVENT_LOG_BLOCK_UPDATES =
+    ConfigBuilder("spark.eventLog.logBlockUpdates.enabled")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[spark] val EVENT_LOG_TESTING =
+    ConfigBuilder("spark.eventLog.testing")
+      .internal()
+      .booleanConf
+      .createWithDefault(false)
+
+  private[spark] val EVENT_LOG_OUTPUT_BUFFER_SIZE = ConfigBuilder("spark.eventLog.buffer.kb")
+    .bytesConf(ByteUnit.KiB)
+    .createWithDefaultString("100k")
+
+  private[spark] val EVENT_LOG_OVERWRITE =
+    ConfigBuilder("spark.eventLog.overwrite").booleanConf.createWithDefault(false)
+
   private[spark] val EXECUTOR_CLASS_PATH =
     ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1437e344/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 9dafa0b..a77adc5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -37,6 +37,7 @@ import org.json4s.jackson.JsonMethods._
 import org.apache.spark.{SPARK_VERSION, SparkConf}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.util.{JsonProtocol, Utils}
 
@@ -45,6 +46,7 @@ import org.apache.spark.util.{JsonProtocol, Utils}
  *
  * Event logging is specified by the following configurable parameters:
  *   spark.eventLog.enabled - Whether event logging is enabled.
+ *   spark.eventLog.logBlockUpdates.enabled - Whether to log block updates
  *   spark.eventLog.compress - Whether to compress logged events
  *   spark.eventLog.overwrite - Whether to overwrite any existing files.
  *   spark.eventLog.dir - Path to the directory in which events are logged.
@@ -64,10 +66,11 @@ private[spark] class EventLoggingListener(
     this(appId, appAttemptId, logBaseDir, sparkConf,
       SparkHadoopUtil.get.newConfiguration(sparkConf))
 
-  private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
-  private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
-  private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
-  private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
+  private val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS)
+  private val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE)
+  private val shouldLogBlockUpdates = sparkConf.get(EVENT_LOG_BLOCK_UPDATES)
+  private val testing = sparkConf.get(EVENT_LOG_TESTING)
+  private val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt
   private val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
   private val compressionCodec =
     if (shouldCompress) {
@@ -216,8 +219,11 @@ private[spark] class EventLoggingListener(
     logEvent(event, flushLogger = true)
   }
 
-  // No-op because logging every update would be overkill
-  override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {}
+  override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
+    if (shouldLogBlockUpdates) {
+      logEvent(event, flushLogger = true)
+    }
+  }
 
   // No-op because logging every update would be overkill
   override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { }

http://git-wip-us.apache.org/repos/asf/spark/blob/1437e344/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 8406826..5e60218 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -98,8 +98,8 @@ private[spark] object JsonProtocol {
         logStartToJson(logStart)
       case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
         executorMetricsUpdateToJson(metricsUpdate)
-      case blockUpdated: SparkListenerBlockUpdated =>
-        throw new MatchError(blockUpdated)  // TODO(ekl) implement this
+      case blockUpdate: SparkListenerBlockUpdated =>
+        blockUpdateToJson(blockUpdate)
       case _ => parse(mapper.writeValueAsString(event))
     }
   }
@@ -246,6 +246,12 @@ private[spark] object JsonProtocol {
     })
   }
 
+  def blockUpdateToJson(blockUpdate: SparkListenerBlockUpdated): JValue = {
+    val blockUpdatedInfo = blockUpdatedInfoToJson(blockUpdate.blockUpdatedInfo)
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.blockUpdate) ~
+    ("Block Updated Info" -> blockUpdatedInfo)
+  }
+
   /** ------------------------------------------------------------------- *
    * JSON serialization methods for classes SparkListenerEvents depend on |
    * -------------------------------------------------------------------- */
@@ -458,6 +464,14 @@ private[spark] object JsonProtocol {
     ("Log Urls" -> mapToJson(executorInfo.logUrlMap))
   }
 
+  def blockUpdatedInfoToJson(blockUpdatedInfo: BlockUpdatedInfo): JValue = {
+    ("Block Manager ID" -> blockManagerIdToJson(blockUpdatedInfo.blockManagerId)) ~
+    ("Block ID" -> blockUpdatedInfo.blockId.toString) ~
+    ("Storage Level" -> storageLevelToJson(blockUpdatedInfo.storageLevel)) ~
+    ("Memory Size" -> blockUpdatedInfo.memSize) ~
+    ("Disk Size" -> blockUpdatedInfo.diskSize)
+  }
+
   /** ------------------------------ *
    * Util JSON serialization methods |
    * ------------------------------- */
@@ -515,6 +529,7 @@ private[spark] object JsonProtocol {
     val executorRemoved = Utils.getFormattedClassName(SparkListenerExecutorRemoved)
     val logStart = Utils.getFormattedClassName(SparkListenerLogStart)
     val metricsUpdate = Utils.getFormattedClassName(SparkListenerExecutorMetricsUpdate)
+    val blockUpdate = Utils.getFormattedClassName(SparkListenerBlockUpdated)
   }
 
   def sparkEventFromJson(json: JValue): SparkListenerEvent = {
@@ -538,6 +553,7 @@ private[spark] object JsonProtocol {
       case `executorRemoved` => executorRemovedFromJson(json)
       case `logStart` => logStartFromJson(json)
       case `metricsUpdate` => executorMetricsUpdateFromJson(json)
+      case `blockUpdate` => blockUpdateFromJson(json)
       case other => mapper.readValue(compact(render(json)), Utils.classForName(other))
         .asInstanceOf[SparkListenerEvent]
     }
@@ -676,6 +692,11 @@ private[spark] object JsonProtocol {
     SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates)
   }
 
+  def blockUpdateFromJson(json: JValue): SparkListenerBlockUpdated = {
+    val blockUpdatedInfo = blockUpdatedInfoFromJson(json \ "Block Updated Info")
+    SparkListenerBlockUpdated(blockUpdatedInfo)
+  }
+
   /** --------------------------------------------------------------------- *
    * JSON deserialization methods for classes SparkListenerEvents depend on |
    * ---------------------------------------------------------------------- */
@@ -989,6 +1010,15 @@ private[spark] object JsonProtocol {
     new ExecutorInfo(executorHost, totalCores, logUrls)
   }
 
+  def blockUpdatedInfoFromJson(json: JValue): BlockUpdatedInfo = {
+    val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
+    val blockId = BlockId((json \ "Block ID").extract[String])
+    val storageLevel = storageLevelFromJson(json \ "Storage Level")
+    val memorySize = (json \ "Memory Size").extract[Long]
+    val diskSize = (json \ "Disk Size").extract[Long]
+    BlockUpdatedInfo(blockManagerId, blockId, storageLevel, memorySize, diskSize)
+  }
+
   /** -------------------------------- *
    * Util JSON deserialization methods |
    * --------------------------------- */

http://git-wip-us.apache.org/repos/asf/spark/blob/1437e344/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 6b42775..a9e92fa 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -228,6 +228,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
       SparkListenerStageCompleted,
       SparkListenerTaskStart,
       SparkListenerTaskEnd,
+      SparkListenerBlockUpdated,
       SparkListenerApplicationEnd).map(Utils.getFormattedClassName)
     Utils.tryWithSafeFinally {
       val logStart = SparkListenerLogStart(SPARK_VERSION)
@@ -291,6 +292,7 @@ object EventLoggingListenerSuite {
   def getLoggingConf(logDir: Path, compressionCodec: Option[String] = None): SparkConf = {
     val conf = new SparkConf
     conf.set("spark.eventLog.enabled", "true")
+    conf.set("spark.eventLog.logBlockUpdates.enabled", "true")
     conf.set("spark.eventLog.testing", "true")
     conf.set("spark.eventLog.dir", logDir.toString)
     compressionCodec.foreach { codec =>

http://git-wip-us.apache.org/repos/asf/spark/blob/1437e344/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index a1a8587..4abbb8e 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -96,6 +96,9 @@ class JsonProtocolSuite extends SparkFunSuite {
           .zipWithIndex.map { case (a, i) => a.copy(id = i) }
       SparkListenerExecutorMetricsUpdate("exec3", Seq((1L, 2, 3, accumUpdates)))
     }
+    val blockUpdated =
+      SparkListenerBlockUpdated(BlockUpdatedInfo(BlockManagerId("Stars",
+        "In your multitude...", 300), RDDBlockId(0, 0), StorageLevel.MEMORY_ONLY, 100L, 0L))
 
     testEvent(stageSubmitted, stageSubmittedJsonString)
     testEvent(stageCompleted, stageCompletedJsonString)
@@ -120,6 +123,7 @@ class JsonProtocolSuite extends SparkFunSuite {
     testEvent(nodeBlacklisted, nodeBlacklistedJsonString)
     testEvent(nodeUnblacklisted, nodeUnblacklistedJsonString)
     testEvent(executorMetricsUpdate, executorMetricsUpdateJsonString)
+    testEvent(blockUpdated, blockUpdatedJsonString)
   }
 
   test("Dependent Classes") {
@@ -2007,6 +2011,29 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |}
     """.stripMargin
 
+  private val blockUpdatedJsonString =
+    """
+      |{
+      |  "Event": "SparkListenerBlockUpdated",
+      |  "Block Updated Info": {
+      |    "Block Manager ID": {
+      |      "Executor ID": "Stars",
+      |      "Host": "In your multitude...",
+      |      "Port": 300
+      |    },
+      |    "Block ID": "rdd_0_0",
+      |    "Storage Level": {
+      |      "Use Disk": false,
+      |      "Use Memory": true,
+      |      "Deserialized": true,
+      |      "Replication": 1
+      |    },
+      |    "Memory Size": 100,
+      |    "Disk Size": 0
+      |  }
+      |}
+    """.stripMargin
+
   private val executorBlacklistedJsonString =
     s"""
       |{

http://git-wip-us.apache.org/repos/asf/spark/blob/1437e344/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index bb06c8f..7b9e16a 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -715,6 +715,14 @@ Apart from these, the following properties are also available, and may be useful
 <table class="table">
 <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
 <tr>
+  <td><code>spark.eventLog.logBlockUpdates.enabled</code></td>
+  <td>false</td>
+  <td>
+    Whether to log events for every block update, if <code>spark.eventLog.enabled</code> is true.
+    *Warning*: This will increase the size of the event log considerably.
+  </td>
+</tr>
+<tr>
   <td><code>spark.eventLog.compress</code></td>
   <td>false</td>
   <td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org