You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/11/02 21:43:23 UTC

[GitHub] [spark] zhouyejoe commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics

zhouyejoe commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r1012322498


##########
core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala:
##########
@@ -227,6 +227,16 @@ class TaskMetrics private[spark] () extends Serializable {
     shuffleRead.LOCAL_BYTES_READ -> shuffleReadMetrics._localBytesRead,
     shuffleRead.FETCH_WAIT_TIME -> shuffleReadMetrics._fetchWaitTime,
     shuffleRead.RECORDS_READ -> shuffleReadMetrics._recordsRead,
+    shuffleRead.CORRUPT_MERGED_BLOCK_CHUNKS -> shuffleReadMetrics._corruptMergedBlockChunks,
+    shuffleRead.FALLBACK_COUNT -> shuffleReadMetrics._fallbackCount,
+    shuffleRead.REMOTE_MERGED_BLOCKS_FETCHED -> shuffleReadMetrics._remoteMergedBlocksFetched,
+    shuffleRead.LOCAL_MERGED_BLOCKS_FETCHED -> shuffleReadMetrics._localMergedBlocksFetched,
+    shuffleRead.REMOTE_MERGED_CHUNKS_FETCHED -> shuffleReadMetrics._remoteMergedChunksFetched,
+    shuffleRead.LOCAL_MERGED_CHUNKS_FETCHED -> shuffleReadMetrics._localMergedChunksFetched,
+    shuffleRead.REMOTE_MERGED_BYTES_READ -> shuffleReadMetrics._remoteMergedBlocksBytesRead,
+    shuffleRead.LOCAL_MERGED_BYTES_READ -> shuffleReadMetrics._localMergedBlocksBytesRead,
+    shuffleRead.REMOTE_REQS_DURATION -> shuffleReadMetrics._remoteReqsDuration,
+    shuffleRead.REMOTE_MERGED_REQS_DURATION -> shuffleReadMetrics._remoteMergedReqsDuration,
     shuffleWrite.BYTES_WRITTEN -> shuffleWriteMetrics._bytesWritten,
     shuffleWrite.RECORDS_WRITTEN -> shuffleWriteMetrics._recordsWritten,
     shuffleWrite.WRITE_TIME -> shuffleWriteMetrics._writeTime,

Review Comment:
   I think we are missing the three push related metrics for the ShuffleWriter, including "BlocksNotPushed", "BlocksTooLate" and  "BlocksCollided" here, and also the other relevant part, including the metrics logging in the Executor, the rendering in SHS and also in the SQLShuffleMetricsReporter. 



##########
core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala:
##########
@@ -146,6 +268,16 @@ private[spark] class TempShuffleReadMetrics extends ShuffleReadMetricsReporter {
   override def incLocalBytesRead(v: Long): Unit = _localBytesRead += v
   override def incFetchWaitTime(v: Long): Unit = _fetchWaitTime += v
   override def incRecordsRead(v: Long): Unit = _recordsRead += v
+  override def incCorruptMergedBlockChunks(v: Long): Unit = _corruptMergedBlockChunks += v
+  override def incFallbackCount(v: Long): Unit = _fallbackCount += v
+  override def incRemoteMergedBlocksFetched(v: Long): Unit = _remoteMergedBlocksFetched += v
+  override def incLocalMergedBlocksFetched(v: Long): Unit = _localMergedBlocksFetched += v
+  override def incRemoteMergedChunksFetched(v: Long): Unit = _remoteMergedChunksFetched += v
+  override def incLocalMergedChunksFetched(v: Long): Unit = _localMergedChunksFetched += v
+  override def incRemoteMergedBlocksBytesRead(v: Long): Unit = _remoteMergedBlocksBytesRead += v
+  override def incLocalMergedBlocksBytesRead(v: Long): Unit = _localMergedBlocksBytesRead += v
+  override def incRemoteReqsDuration(v: Long): Unit = _remoteReqsDuration += v
+  override def incRemoteMergedReqsDuration(v: Long): Unit = _remoteMergedReqsDuration += v

Review Comment:
   I am able to find the calling for these two methods in SHS which can build the metrics up from JSON, but the caller in the ShuffleBlockFetcherIterator is actually missing in this patch. For the caller part in Executor, which measures the remote request duration, needs more refactoring in the Iterator. Probably we can carve these two metrics out in this PR. If needed, we can create another PR which tracks the remote request duration for unmerged and merged shuffle. @mridulm @otterc Thoughts? 



##########
core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala:
##########
@@ -78,6 +78,16 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
         shuffleLocalBytesRead = 1L,
         shuffleReadBytes = 1L,
         shuffleReadRecords = 1L,
+        shuffleCorruptMergedBlockChunks = 2L,
+        shuffleFallbackCount = 2L,

Review Comment:
   Any particular reason these are set to 2?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org