You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/10/26 06:25:11 UTC

[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics

mridulm commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r978229580


##########
core/src/main/scala/org/apache/spark/InternalAccumulator.scala:
##########
@@ -54,13 +56,26 @@ private[spark] object InternalAccumulator {
     val LOCAL_BYTES_READ = SHUFFLE_READ_METRICS_PREFIX + "localBytesRead"
     val FETCH_WAIT_TIME = SHUFFLE_READ_METRICS_PREFIX + "fetchWaitTime"
     val RECORDS_READ = SHUFFLE_READ_METRICS_PREFIX + "recordsRead"
+    val REMOTE_REQS_DURATION = SHUFFLE_READ_METRICS_PREFIX + "remoteReqsDuration"
+    val CORRUPT_MERGED_BLOCK_CHUNKS = SHUFFLE_PUSH_READ_METRICS_PREFIX + "corruptMergedBlockChunks"
+    val FALLBACK_COUNT = SHUFFLE_PUSH_READ_METRICS_PREFIX + "fallbackCount"
+    val REMOTE_MERGED_BLOCKS_FETCHED = SHUFFLE_PUSH_READ_METRICS_PREFIX + "remoteMergedBlocksFetched"
+    val LOCAL_MERGED_BLOCKS_FETCHED = SHUFFLE_PUSH_READ_METRICS_PREFIX + "localMergedBlocksFetched"
+    val REMOTE_MERGED_CHUNKS_FETCHED = SHUFFLE_PUSH_READ_METRICS_PREFIX + "remoteMergedChunksFetched"
+    val LOCAL_MERGED_CHUNKS_FETCHED = SHUFFLE_PUSH_READ_METRICS_PREFIX + "localMergedChunksFetched"
+    val REMOTE_MERGED_BLOCKS_READ = SHUFFLE_PUSH_READ_METRICS_PREFIX + "remoteMergedBytesRead"
+    val LOCAL_MERGED_BLOCKS_READ = SHUFFLE_PUSH_READ_METRICS_PREFIX + "localMergedBytesRead"
+    val REMOTE_MERGED_REQS_DURATION = SHUFFLE_PUSH_READ_METRICS_PREFIX + "remoteMergedReqsDuration"

Review Comment:
   Can we check if all the metrics are getting populated ?
   A cursory indicated the following are not populated:
   
   * `remoteReqsDuration`
   * `remoteMergedReqsDuration`
   
   Also, make `remoteMergedBytesRead` and `localMergedBytesRead` consistent with the name of its corresponding accumulator ?



##########
core/src/main/scala/org/apache/spark/status/storeTypes.scala:
##########
@@ -233,6 +243,38 @@ private[spark] class TaskDataWrapper(
     val shuffleLocalBytesRead: Long,
     @KVIndexParam(value = TaskIndexNames.SHUFFLE_READ_RECORDS, parent = TaskIndexNames.STAGE)
     val shuffleRecordsRead: Long,
+    @KVIndexParam(
+      value = TaskIndexNames.PUSH_BASED_SHUFFLE_CORRUPT_MERGED_BLOCK_CHUNKS,
+      parent = TaskIndexNames.STAGE)
+    val shuffleCorruptMergedBlockChunks: Long,
+    @KVIndexParam(value = TaskIndexNames.PUSH_BASED_SHUFFLE_FALLBACK_COUNT,
+      parent = TaskIndexNames.STAGE)
+    val shuffleFallbackCount: Long,
+    @KVIndexParam(
+      value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_BLOCKS, parent = TaskIndexNames.STAGE)
+    val shuffleMergedRemoteBlocksFetched: Long,
+    @KVIndexParam(
+      value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_LOCAL_BLOCKS, parent = TaskIndexNames.STAGE)
+    val shuffleMergedLocalBlocksFetched: Long,
+    @KVIndexParam(
+      value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_CHUNKS, parent = TaskIndexNames.STAGE)
+    val shuffleMergedRemoteChunksFetched: Long,
+    @KVIndexParam(
+      value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_LOCAL_CHUNKS, parent = TaskIndexNames.STAGE)
+    val shuffleMergedLocalChunksFetched: Long,
+    @KVIndexParam(
+      value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_READS, parent = TaskIndexNames.STAGE)
+    val shuffleMergedRemoteBlocksBytesRead: Long,
+    @KVIndexParam(
+      value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_LOCAL_READS, parent = TaskIndexNames.STAGE)
+    val shuffleMergedLocalBlocksBytesRead: Long,
+    @KVIndexParam(
+      value = TaskIndexNames.PUSH_BASED_SHUFFLE_REMOTE_REQS_DURATION, parent = TaskIndexNames.STAGE)
+    val shuffleRemoteReqsDuration: Long,
+    @KVIndexParam(
+      value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_REQS_DURATION,
+      parent = TaskIndexNames.STAGE)

Review Comment:
   Do we need to index any of these ? None of these are sortable columns in the UI right ?



##########
core/src/main/scala/org/apache/spark/executor/Executor.scala:
##########
@@ -654,6 +654,27 @@ private[spark] class Executor(
         executorSource.METRIC_RESULT_SIZE.inc(task.metrics.resultSize)
         executorSource.METRIC_DISK_BYTES_SPILLED.inc(task.metrics.diskBytesSpilled)
         executorSource.METRIC_MEMORY_BYTES_SPILLED.inc(task.metrics.memoryBytesSpilled)
+        executorSource.METRIC_PUSH_BASED_SHUFFLE_CORRUPT_MERGED_BLOCK_CHUNKS
+          .inc(task.metrics.shuffleReadMetrics.corruptMergedBlockChunks)
+        executorSource.METRIC_PUSH_BASED_SHUFFLE_FALLBACK_COUNT
+          .inc(task.metrics.shuffleReadMetrics.fallbackCount)
+        executorSource.METRIC_PUSH_BASED_SHUFFLE_MERGED_REMOTE_BLOCKS_FETCHED
+          .inc(task.metrics.shuffleReadMetrics.remoteMergedBlocksFetched)
+        executorSource.METRIC_PUSH_BASED_SHUFFLE_MERGED_LOCAL_BLOCKS_FETCHED
+          .inc(task.metrics.shuffleReadMetrics.localMergedBlocksFetched)
+        executorSource.METRIC_PUSH_BASED_SHUFFLE_MERGED_REMOTE_CHUNKS_FETCHED
+          .inc(task.metrics.shuffleReadMetrics.remoteMergedChunksFetched)
+        executorSource.METRIC_PUSH_BASED_SHUFFLE_MERGED_LOCAL_CHUNKS_FETCHED
+          .inc(task.metrics.shuffleReadMetrics.localMergedChunksFetched)
+        executorSource.METRIC_PUSH_BASED_SHUFFLE_MERGED_REMOTE_BYTES_READ
+          .inc(task.metrics.shuffleReadMetrics.remoteMergedBlocksBytesRead)
+
+        executorSource.METRIC_PUSH_BASED_SHUFFLE_MERGED_LOCAL_BLOCKS_FETCHED
+          .inc(task.metrics.shuffleReadMetrics.localMergedBlocksBytesRead)
+        executorSource.METRIC_PUSH_BASED_SHUFFLE_REMOTE_REQS_DURATION
+          .inc(task.metrics.shuffleReadMetrics.remoteReqsDuration)
+        executorSource.METRIC_PUSH_BASED_SHUFFLE_MERGED_REMOTE_REQS_DURATION
+          .inc(task.metrics.shuffleReadMetrics.remoteMergedReqsDuration)

Review Comment:
   Given the number of shuffle related metrics getting updated now - move all `METRIC_*SHUFFLE_*` metric updates to a different method ? Both the newly introduced metrics and the existing `METRIC_SHUFFLE_` metrics
   



##########
core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala:
##########
@@ -43,7 +44,8 @@ private class PushBasedFetchHelper(
    private val iterator: ShuffleBlockFetcherIterator,
    private val shuffleClient: BlockStoreClient,
    private val blockManager: BlockManager,
-   private val mapOutputTracker: MapOutputTracker) extends Logging {
+   private val mapOutputTracker: MapOutputTracker,
+  private val shuffleMetrics: ShuffleReadMetricsReporter) extends Logging {

Review Comment:
   fix indentation



##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -722,6 +718,63 @@ final class ShuffleBlockFetcherIterator(
     }
   }
 
+  // Number of map blocks in a ShuffleBlockChunk
+  private def getShuffleChunkCardinality(blockId: ShuffleBlockChunkId): Int = {
+    val chunkTracker = pushBasedFetchHelper.getRoaringBitMap(blockId)
+    chunkTracker match {
+      case Some(bitmap) => bitmap.getCardinality
+      case None => 0
+    }
+  }
+
+  // Check if the merged block is local to the host or not
+  private def isLocalMergedBlockAddress(address: BlockManagerId): Boolean = {
+    address.executorId.isEmpty && address.host == blockManager.blockManagerId.host
+  }
+
+  private def shuffleMetricsUpdate(
+    blockId: BlockId,
+    buf: ManagedBuffer,
+    isLocal: Boolean): Unit = {

Review Comment:
   nit: `isLocal` -> `local`



##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -722,6 +718,63 @@ final class ShuffleBlockFetcherIterator(
     }
   }
 
+  // Number of map blocks in a ShuffleBlockChunk
+  private def getShuffleChunkCardinality(blockId: ShuffleBlockChunkId): Int = {
+    val chunkTracker = pushBasedFetchHelper.getRoaringBitMap(blockId)
+    chunkTracker match {
+      case Some(bitmap) => bitmap.getCardinality
+      case None => 0
+    }
+  }
+
+  // Check if the merged block is local to the host or not
+  private def isLocalMergedBlockAddress(address: BlockManagerId): Boolean = {
+    address.executorId.isEmpty && address.host == blockManager.blockManagerId.host
+  }
+
+  private def shuffleMetricsUpdate(
+    blockId: BlockId,
+    buf: ManagedBuffer,
+    isLocal: Boolean): Unit = {
+    if (isLocal) {
+      shuffleLocalMetricsUpdate(blockId, buf)
+    } else {
+      shuffleRemoteMetricsUpdate(blockId, buf)
+    }
+  }
+
+  private def shuffleLocalMetricsUpdate(blockId: BlockId, buf: ManagedBuffer): Unit = {
+    // Check if the block is within the host-local blocks set, or if it is a merged local
+    // block. In either case, it is local read and we need to increase the local
+    // shuffle read metrics.
+    blockId match {
+      case chunkId: ShuffleBlockChunkId =>
+        shuffleMetrics.incLocalMergedChunksFetched(1)
+        shuffleMetrics.incLocalMergedBlocksFetched(getShuffleChunkCardinality(chunkId))
+        shuffleMetrics.incLocalMergedBlocksBytesRead(buf.size)
+        shuffleMetrics.incLocalBlocksFetched(getShuffleChunkCardinality(chunkId))

Review Comment:
   Compute `getShuffleChunkCardinality` once and reuse for both invocation. Here and in `shuffleRemoteMetricsUpdate` below.



##########
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala:
##########
@@ -1219,7 +1219,12 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
             new RuntimeException("forced error"))
         }

Review Comment:
   Also update the other tests, particularly related to push based shuffle to assert on the expected metrics ?
   Ideally, the original tests should have done it (if the metrics had been added initially) - and so now we have to retroactively fix them ...



##########
core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala:
##########
@@ -1780,7 +1802,19 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |      "Remote Bytes Read" : 0,
       |      "Remote Bytes Read To Disk" : 0,
       |      "Local Bytes Read" : 0,
-      |      "Total Records Read" : 0
+      |      "Total Records Read" : 0,
+      |      "Remote Requests Duration": 0,
+      |      "Push Based": {

Review Comment:
   Rename this to "Push Based Shuffle" instead ?
   It is within the context of `Shuffle Read Metrics`, but makes it more clear.
   
   Thoughts ?
   
   +CC @otterc, @Ngone51 



##########
core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala:
##########
@@ -78,6 +78,16 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
         shuffleLocalBytesRead = 1L,
         shuffleReadBytes = 1L,
         shuffleReadRecords = 1L,
+        shuffleCorruptMergedBlockChunks = 2L,
+        shuffleFallbackCount = 2L,
+        shuffleMergedRemoteBlocksFetched = 1L,
+        shuffleMergedLocalBlocksFetched = 1L,
+        shuffleMergedRemoteChunksFetched = 1L,
+        shuffleMergedLocalChunksFetched = 1L,
+        shuffleMergedRemoteBytesRead = 1L,
+        shuffleMergedLocalBytesRead = 1L,
+        shuffleRemoteReqsDuration = 1L,
+        shuffleMergedRemoteReqsDuration = 1L,

Review Comment:
   Any particular reason these are set to `1` ?
   Particularly when `shuffleMergersCount` == 0 ?



##########
core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala:
##########
@@ -1653,7 +1663,19 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |      "Remote Bytes Read": 1000,
       |      "Remote Bytes Read To Disk": 400,
       |      "Local Bytes Read": 1100,
-      |      "Total Records Read": 10
+      |      "Total Records Read": 10,
+      |      "Remote Requests Duration": 900,
+      |      "Push Based": {
+      |         "Corrupt Merged Block Chunks" : 100,
+      |         "Fallback Count" : 100,

Review Comment:
   Curious why these are nonzero if push based shuffle was disabled ?
   Are these simply values to test serde ?



##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -233,15 +233,11 @@ final class ShuffleBlockFetcherIterator(
       result match {
         case SuccessFetchResult(blockId, mapIndex, address, _, buf, _) =>
           if (address != blockManager.blockManagerId) {
-            if (hostLocalBlocks.contains(blockId -> mapIndex)) {
-              shuffleMetrics.incLocalBlocksFetched(1)
-              shuffleMetrics.incLocalBytesRead(buf.size)
+            if (hostLocalBlocks.contains(blockId -> mapIndex)
+              || isLocalMergedBlockAddress(address)) {

Review Comment:
   super nit: check for `isLocalMergedBlockAddress` first ?



##########
core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala:
##########
@@ -290,6 +301,7 @@ private class PushBasedFetchHelper(
       address: BlockManagerId): Unit = {
     assert(blockId.isInstanceOf[ShuffleMergedBlockId] || blockId.isInstanceOf[ShuffleBlockChunkId])
     logWarning(s"Falling back to fetch the original blocks for push-merged block $blockId")
+    shuffleMetrics.incFallbackCount(1)

Review Comment:
   This should be move into the `case shuffleBlockId: ShuffleMergedBlockId` block below.
   Else, for the `case _` block, we will be off by `1`



##########
core/src/main/scala/org/apache/spark/status/storeTypes.scala:
##########
@@ -138,6 +138,16 @@ private[spark] object TaskIndexNames {
   final val SHUFFLE_WRITE_RECORDS = "swr"
   final val SHUFFLE_WRITE_SIZE = "sws"
   final val SHUFFLE_WRITE_TIME = "swt"
+  final val PUSH_BASED_SHUFFLE_CORRUPT_MERGED_BLOCK_CHUNKS = "scmbc"
+  final val PUSH_BASED_SHUFFLE_FALLBACK_COUNT = "sfc"
+  final val PUSH_BASED_SHUFFLE_MERGED_REMOTE_BLOCKS = "smrb"
+  final val PUSH_BASED_SHUFFLE_MERGED_LOCAL_BLOCKS = "smlb"
+  final val PUSH_BASED_SHUFFLE_MERGED_REMOTE_CHUNKS = "smrc"
+  final val PUSH_BASED_SHUFFLE_MERGED_LOCAL_CHUNKS = "smlc"
+  final val PUSH_BASED_SHUFFLE_MERGED_REMOTE_READS = "smrr"
+  final val PUSH_BASED_SHUFFLE_MERGED_LOCAL_READS = "smlr"
+  final val PUSH_BASED_SHUFFLE_REMOTE_REQS_DURATION = "srrd"
+  final val PUSH_BASED_SHUFFLE_MERGED_REMOTE_REQS_DURATION = "smrrd"

Review Comment:
   add `pb` as prefix ?



##########
core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala:
##########
@@ -2623,48 +2677,118 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |        },
       |        {
       |          "ID": 19,
+      |          "Name": "${shuffleRead.FALLBACK_COUNT}",
+      |          "Update": 0,
+      |          "Internal": true,
+      |          "Count Failed Values": true
+      |        },
+      |        {
+      |          "ID" : 20,
+      |          "Name" : "${shuffleRead.REMOTE_MERGED_BLOCKS_FETCHED}",
+      |          "Update" : 0,
+      |          "Internal" : true,
+      |          "Count Failed Values" : true
+      |        },
+      |        {
+      |          "ID" : 21,
+      |          "Name" : "${shuffleRead.LOCAL_MERGED_BLOCKS_FETCHED}",
+      |          "Update" : 0,
+      |          "Internal" : true,
+      |          "Count Failed Values" : true
+      |        },
+      |        {
+      |          "ID" : 22,
+      |          "Name" : "${shuffleRead.REMOTE_MERGED_CHUNKS_FETCHED}",
+      |          "Update" : 0,
+      |          "Internal" : true,
+      |          "Count Failed Values" : true
+      |        },
+      |        {
+      |          "ID" : 23,
+      |          "Name" : "${shuffleRead.LOCAL_MERGED_CHUNKS_FETCHED}",
+      |          "Update" : 0,
+      |          "Internal" : true,
+      |          "Count Failed Values" : true
+      |        },
+      |        {
+      |          "ID" : 24,
+      |          "Name" : "${shuffleRead.REMOTE_MERGED_BLOCKS_READ}",
+      |          "Update" : 0,
+      |          "Internal" : true,
+      |          "Count Failed Values" : true
+      |        },
+      |        {
+      |          "ID" : 25,
+      |          "Name" : "${shuffleRead.LOCAL_MERGED_BLOCKS_READ}",
+      |          "Update" : 0,
+      |          "Internal" : true,
+      |          "Count Failed Values" : true
+      |        },
+      |        {
+      |          "ID" : 26,
+      |          "Name" : "${shuffleRead.REMOTE_REQS_DURATION}",
+      |          "Update" : 0,
+      |          "Internal" : true,
+      |          "Count Failed Values" : true
+      |        },
+      |        {
+      |          "ID" : 27,
+      |          "Name" : "${shuffleRead.REMOTE_MERGED_REQS_DURATION}",
+      |          "Update" : 0,
+      |          "Internal" : true,
+      |          "Count Failed Values" : true
+      |        },
+      |        {
+      |          "ID": 28,
+      |          "Name": "${shuffleWrite.BYTES_WRITTEN}",
+      |          "Update": 0,
+      |          "Internal": true,
+      |          "Count Failed Values": true
+      |        },
+      |        {
+      |          "ID": 29,
       |          "Name": "${shuffleWrite.RECORDS_WRITTEN}",
       |          "Update": 0,
       |          "Internal": true,
       |          "Count Failed Values": true
       |        },
       |        {
-      |          "ID": 20,
+      |          "ID": 30,
       |          "Name": "${shuffleWrite.WRITE_TIME}",
       |          "Update": 0,
       |          "Internal": true,
       |          "Count Failed Values": true
       |        },
       |        {
-      |          "ID": 21,
+      |          "ID": 31,
       |          "Name": "${input.BYTES_READ}",
       |          "Update": 2100,
       |          "Internal": true,
       |          "Count Failed Values": true
       |        },
       |        {
-      |          "ID": 22,
+      |          "ID": 32,
       |          "Name": "${input.RECORDS_READ}",
       |          "Update": 21,
       |          "Internal": true,
       |          "Count Failed Values": true
       |        },
       |        {
-      |          "ID": 23,
+      |          "ID": 33,
       |          "Name": "${output.BYTES_WRITTEN}",
       |          "Update": 1200,
       |          "Internal": true,
       |          "Count Failed Values": true
       |        },
       |        {
-      |          "ID": 24,
+      |          "ID": 34,
       |          "Name": "${output.RECORDS_WRITTEN}",

Review Comment:
   Curious why some of these values changed.
   I am yet to look at tests more carefully though ...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org