You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/09/16 19:02:11 UTC

[GitHub] [spark] mridulm commented on a change in pull request #34000: [SPARK-36620][SHUFFLE] Add client side push based shuffle metrics

mridulm commented on a change in pull request #34000:
URL: https://github.com/apache/spark/pull/34000#discussion_r710379299



##########
File path: core/src/main/scala/org/apache/spark/InternalAccumulator.scala
##########
@@ -54,13 +56,26 @@ private[spark] object InternalAccumulator {
     val LOCAL_BYTES_READ = SHUFFLE_READ_METRICS_PREFIX + "localBytesRead"
     val FETCH_WAIT_TIME = SHUFFLE_READ_METRICS_PREFIX + "fetchWaitTime"
     val RECORDS_READ = SHUFFLE_READ_METRICS_PREFIX + "recordsRead"
+    val CORRUPT_MERGED_BLOCK_CHUNKS = SHUFFLE_PUSH_READ_METRICS_PREFIX + "corruptMergedBlockChunks"
+    val FALLBACK_COUNT = SHUFFLE_PUSH_READ_METRICS_PREFIX + "fallbackCount"
+    val REMOTE_MERGED_BLOCKS_FETCHED = SHUFFLE_PUSH_READ_METRICS_PREFIX + "remoteMergedBlocksFetched"
+    val LOCAL_MERGED_BLOCKS_FETCHED = SHUFFLE_PUSH_READ_METRICS_PREFIX + "localMergedBlocksFetched"
+    val REMOTE_MERGED_CHUNKS_FETCHED = SHUFFLE_PUSH_READ_METRICS_PREFIX + "remoteMergedChunksFetched"
+    val LOCAL_MERGED_CHUNKS_FETCHED = SHUFFLE_PUSH_READ_METRICS_PREFIX + "localMergedChunksFetched"
+    val REMOTE_MERGED_BLOCKS_READ = SHUFFLE_PUSH_READ_METRICS_PREFIX + "remoteMergedBytesRead"
+    val LOCAL_MERGED_BLOCKS_READ = SHUFFLE_PUSH_READ_METRICS_PREFIX + "localMergedBytesRead"
+    val REMOTE_REQS_DURATION = SHUFFLE_READ_METRICS_PREFIX + "remoteReqsDuration"

Review comment:
       Move this above with other `SHUFFLE_READ_METRICS_PREFIX` metrics ?

##########
File path: core/src/main/scala/org/apache/spark/InternalAccumulator.scala
##########
@@ -54,13 +56,26 @@ private[spark] object InternalAccumulator {
     val LOCAL_BYTES_READ = SHUFFLE_READ_METRICS_PREFIX + "localBytesRead"
     val FETCH_WAIT_TIME = SHUFFLE_READ_METRICS_PREFIX + "fetchWaitTime"
     val RECORDS_READ = SHUFFLE_READ_METRICS_PREFIX + "recordsRead"
+    val CORRUPT_MERGED_BLOCK_CHUNKS = SHUFFLE_PUSH_READ_METRICS_PREFIX + "corruptMergedBlockChunks"
+    val FALLBACK_COUNT = SHUFFLE_PUSH_READ_METRICS_PREFIX + "fallbackCount"

Review comment:
       Thoughts on renaming this to be better descriptive ?

##########
File path: core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
##########
@@ -192,7 +192,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
     // Enable "spark.eventLog.logBlockUpdates.enabled", to get the storage information
     // in the history server.
     "one rdd storage json" -> "applications/local-1422981780767/storage/rdd/0",
-    "miscellaneous process" -> "applications/application_1555004656427_0144/allmiscellaneousprocess"
+    "miscellaneous process" ->
+      "applications/application_1555004656427_0144/allmiscellaneousprocess"

Review comment:
       Revert ?

##########
File path: core/src/main/scala/org/apache/spark/executor/Executor.scala
##########
@@ -607,6 +613,26 @@ private[spark] class Executor(
         executorSource.METRIC_RESULT_SIZE.inc(task.metrics.resultSize)
         executorSource.METRIC_DISK_BYTES_SPILLED.inc(task.metrics.diskBytesSpilled)
         executorSource.METRIC_MEMORY_BYTES_SPILLED.inc(task.metrics.memoryBytesSpilled)
+        executorSource.METRIC_SHUFFLE_CORRUPT_MERGED_BLOCK_CHUNKS
+          .inc(task.metrics.shuffleReadMetrics.corruptMergedBlockChunks)
+        executorSource.METRIC_SHUFFLE_FALLBACK_COUNT
+          .inc(task.metrics.shuffleReadMetrics.fallbackCount)
+        executorSource.METRIC_SHUFFLE_MERGED_REMOTE_BLOCKS_FETCHED
+          .inc(task.metrics.shuffleReadMetrics.remoteMergedBlocksFetched)
+        executorSource.METRIC_SHUFFLE_MERGED_LOCAL_BLOCKS_FETCHED
+          .inc(task.metrics.shuffleReadMetrics.localMergedBlocksFetched)
+        executorSource.METRIC_SHUFFLE_MERGED_REMOTE_CHUNKS_FETCHED
+          .inc(task.metrics.shuffleReadMetrics.remoteMergedChunksFetched)
+        executorSource.METRIC_SHUFFLE_MERGED_LOCAL_CHUNKS_FETCHED
+          .inc(task.metrics.shuffleReadMetrics.localMergedChunksFetched)
+        executorSource.METRIC_SHUFFLE_MERGED_REMOTE_BYTES_READ
+          .inc(task.metrics.shuffleReadMetrics.remoteMergedBlocksBytesRead)
+        executorSource.METRIC_SHUFFLE_MERGED_LOCAL_BLOCKS_FETCHED
+          .inc(task.metrics.shuffleReadMetrics.localMergedBlocksBytesRead)
+        executorSource.METRIC_SHUFFLE_REMOTE_REQS_DURATION
+          .inc(task.metrics.shuffleReadMetrics.remoteReqsDuration)
+        executorSource.METRIC_SHUFFLE_MERGED_REMOTE_REQS_DURATION
+          .inc(task.metrics.shuffleReadMetrics.remoteMergedReqsDuration)

Review comment:
       @Ngone51 Do we want to move these increments into the metrics themselves ?
   Else executor will need to keep updating for each metric we add ...
   
   Something like:
   * Add a `private[scala]` base trait like `SparkMetrics` for ShuffleReadMetrics/ShuffleWriteMetrics/etc: which exposes a `private[scala] def update(src: ExecutorSource)`
   * And in `Executor` we simply do `for (metrics <- task.allMetrics) metrics.update(executorSource)`
   
   
   Thoughts ?
   
   (Also, +CC @jiangxb1987 since you had reviewed some of the executor source changes in past)

##########
File path: core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
##########
@@ -1103,6 +1107,9 @@ private[spark] object JsonProtocolSuite extends Assertions {
       sw.incBytesWritten(a + b + c)
       sw.incWriteTime(b + c + d)
       sw.incRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1)
+      sw.incBlocksNotPushed(if (b > a) b - a else a - b)
+      sw.incBlocksCollided(if (b > a) b - a else a - b)
+      sw.incBlocksTooLate(if (b > a) b - a else a - b)

Review comment:
       nit: use `abs` instead ?

##########
File path: core/src/main/scala/org/apache/spark/status/api/v1/api.scala
##########
@@ -275,9 +275,22 @@ class StageData private[spark](
     val shuffleLocalBytesRead: Long,
     val shuffleReadBytes: Long,
     val shuffleReadRecords: Long,
+    val shuffleCorruptMergedBlockChunks: Long,
+    val shuffleFallbackCount: Long,
+    val shuffleMergedRemoteBlocksFetched: Long,
+    val shuffleMergedLocalBlocksFetched: Long,
+    val shuffleMergedRemoteChunksFetched: Long,
+    val shuffleMergedLocalChunksFetched: Long,
+    val shuffleMergedRemoteBytesRead: Long,
+    val shuffleMergedLocalBytesRead: Long,
+    val shuffleRemoteReqsDuration: Long,
+    val shuffleMergedRemoteReqsDuration: Long,
     val shuffleWriteBytes: Long,
     val shuffleWriteTime: Long,
     val shuffleWriteRecords: Long,
+    val shuffleBlocksNotPushed: Long,
+    val shuffleBlocksCollided: Long,
+    val shuffleBlocksTooLate: Long,
 

Review comment:
       Rename fields to make it clear they are related to push based shuffle ? (here and elsewhere in UI/API/ShuffleReadMetricsReporter/storeTypes/etc)

##########
File path: core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
##########
@@ -84,13 +94,77 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
    */
   def totalBlocksFetched: Long = remoteBlocksFetched + localBlocksFetched
 
+  /**
+   * Number of corrupt merged shuffle block chunks encountered by this task (remote or local).
+   */
+  def corruptMergedBlockChunks: Long = _corruptMergedBlockChunks.sum
+
+  /**
+   * Number of times the task had to fallback to fetch original shuffle blocks for a merged
+   * shuffle block chunk (remote or local).
+   */
+  def fallbackCount: Long = _fallbackCount.sum
+
+  /**
+   * Number of remote merged blocks fetched in this shuffle by this task.

Review comment:
       nit: remove the 'in this shuffle by this task' suffix ?

##########
File path: core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
##########
@@ -84,13 +94,77 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
    */
   def totalBlocksFetched: Long = remoteBlocksFetched + localBlocksFetched
 
+  /**
+   * Number of corrupt merged shuffle block chunks encountered by this task (remote or local).
+   */
+  def corruptMergedBlockChunks: Long = _corruptMergedBlockChunks.sum
+
+  /**
+   * Number of times the task had to fallback to fetch original shuffle blocks for a merged
+   * shuffle block chunk (remote or local).
+   */
+  def fallbackCount: Long = _fallbackCount.sum
+
+  /**
+   * Number of remote merged blocks fetched in this shuffle by this task.
+   */
+  def remoteMergedBlocksFetched: Long = _remoteMergedBlocksFetched.sum
+
+  /**
+   * Number of local merged blocks fetched in this shuffle by this task.
+   */
+  def localMergedBlocksFetched: Long = _localMergedBlocksFetched.sum
+
+  /**
+   * Number of remote merged chunks fetched in this shuffle by this task.
+   */
+  def remoteMergedChunksFetched: Long = _remoteMergedChunksFetched.sum
+
+  /**
+   * Number of local merged chunks fetched in this shuffle by this task.
+   */
+  def localMergedChunksFetched: Long = _localMergedChunksFetched.sum
+
+  /**
+   * Total number of remote merged bytes read from the shuffle by this task.
+   */
+  def remoteMergedBlocksBytesRead: Long = _remoteMergedBlocksBytesRead.sum
+
+  /**
+   * Total number of local merged bytes read from the shuffle by this task.
+   */
+  def localMergedBlocksBytesRead: Long = _localMergedBlocksBytesRead.sum
+
+  /**
+   * Total time taken for remote requests to complete by this task. This doesn't include
+   * duration of remote merged requests.

Review comment:
       Do we want to make this total remote requests and to compute non-merge reqs, do `remoteReqsDuration - remoteMergedReqsDuration` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org