You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/08 19:40:40 UTC

[GitHub] [spark] otterc commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side read metrics

otterc commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r1043701720


##########
core/src/main/scala/org/apache/spark/status/api/v1/api.scala:
##########
@@ -302,7 +312,9 @@ class StageData private[spark](
     @JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
     val peakExecutorMetrics: Option[ExecutorMetrics],
     val taskMetricsDistributions: Option[TaskMetricDistributions],
-    val executorMetricsDistributions: Option[ExecutorMetricsDistributions])
+    val executorMetricsDistributions: Option[ExecutorMetricsDistributions],
+    val isPushBasedShuffleEnabled: Boolean,

Review Comment:
   Nit: We have been referring to `pushBasedShuffle` in the OSS as just either `push` or `pushMerge`. I think we should remove `Based` from all the metrics related code. Rename this to `isShufflePushEnabled`?



##########
core/src/main/scala/org/apache/spark/status/storeTypes.scala:
##########
@@ -138,6 +138,16 @@ private[spark] object TaskIndexNames {
   final val SHUFFLE_WRITE_RECORDS = "swr"
   final val SHUFFLE_WRITE_SIZE = "sws"
   final val SHUFFLE_WRITE_TIME = "swt"
+  final val SHUFFLE_REMOTE_REQS_DURATION = "srrd"
+  final val PUSH_BASED_SHUFFLE_CORRUPT_MERGED_BLOCK_CHUNKS = "pbscmbc"
+  final val PUSH_BASED_SHUFFLE_MERGED_FETCH_FALLBACK_COUNT = "pbsmffc"
+  final val PUSH_BASED_SHUFFLE_MERGED_REMOTE_BLOCKS = "pbsmrb"
+  final val PUSH_BASED_SHUFFLE_MERGED_LOCAL_BLOCKS = "pbsmlb"
+  final val PUSH_BASED_SHUFFLE_MERGED_REMOTE_CHUNKS = "pbsmrc"
+  final val PUSH_BASED_SHUFFLE_MERGED_LOCAL_CHUNKS = "pbsmlc"
+  final val PUSH_BASED_SHUFFLE_MERGED_REMOTE_READS = "pbsmrr"
+  final val PUSH_BASED_SHUFFLE_MERGED_LOCAL_READS = "pbsmlr"
+  final val PUSH_BASED_SHUFFLE_MERGED_REMOTE_REQS_DURATION = "pbsmrrd"

Review Comment:
   Nit: Remove `BASED`



##########
core/src/main/scala/org/apache/spark/status/api/v1/api.scala:
##########
@@ -349,14 +361,27 @@ class OutputMetrics private[spark](
     val bytesWritten: Long,
     val recordsWritten: Long)
 
+class ShufflePushReadMetrics private[spark](
+  val corruptMergedBlockChunks: Long,
+  val mergedFetchFallbackCount: Long,
+  val remoteMergedBlocksFetched: Long,
+  val localMergedBlocksFetched: Long,
+  val remoteMergedChunksFetched: Long,
+  val localMergedChunksFetched: Long,
+  val remoteMergedBytesRead: Long,
+  val localMergedBytesRead: Long,
+  val remoteMergedReqsDuration: Long)
+
 class ShuffleReadMetrics private[spark](
     val remoteBlocksFetched: Long,
     val localBlocksFetched: Long,
     val fetchWaitTime: Long,
     val remoteBytesRead: Long,
     val remoteBytesReadToDisk: Long,
     val localBytesRead: Long,
-    val recordsRead: Long)
+    val recordsRead: Long,
+    val remoteReqsDuration: Long,
+    val pushBasedShuffle: ShufflePushReadMetrics)

Review Comment:
   Nit: rename to `shufflePushReadMetrics`



##########
core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala:
##########
@@ -115,6 +115,26 @@ class ExecutorSource(
     metricRegistry.counter(MetricRegistry.name("shuffleBytesWritten"))
   val METRIC_SHUFFLE_RECORDS_WRITTEN =
     metricRegistry.counter(MetricRegistry.name("shuffleRecordsWritten"))
+  val METRIC_SHUFFLE_REMOTE_REQS_DURATION =
+    metricRegistry.counter(MetricRegistry.name("shuffleRemoteReqsDuration"))
+  val METRIC_PUSH_BASED_SHUFFLE_CORRUPT_MERGED_BLOCK_CHUNKS =
+    metricRegistry.counter(MetricRegistry.name("shuffleCorruptMergedBlockChunks"))
+  val METRIC_PUSH_BASED_SHUFFLE_MERGED_FETCH_FALLBACK_COUNT =
+    metricRegistry.counter(MetricRegistry.name("shuffleMergedFetchFallbackCount"))
+  val METRIC_PUSH_BASED_SHUFFLE_MERGED_REMOTE_BLOCKS_FETCHED =
+    metricRegistry.counter(MetricRegistry.name("shuffleMergedRemoteBlocksFetched"))
+  val METRIC_PUSH_BASED_SHUFFLE_MERGED_LOCAL_BLOCKS_FETCHED =
+    metricRegistry.counter(MetricRegistry.name("shuffleMergedLocalBlocksFetched"))
+  val METRIC_PUSH_BASED_SHUFFLE_MERGED_REMOTE_CHUNKS_FETCHED =
+    metricRegistry.counter(MetricRegistry.name("shuffleMergedRemoteChunksFetched"))
+  val METRIC_PUSH_BASED_SHUFFLE_MERGED_LOCAL_CHUNKS_FETCHED =
+    metricRegistry.counter(MetricRegistry.name("shuffleMergedLocalChunksFetched"))
+  val METRIC_PUSH_BASED_SHUFFLE_MERGED_REMOTE_BYTES_READ =
+    metricRegistry.counter(MetricRegistry.name("shuffleMergedRemoteBytesRead"))
+  val METRIC_PUSH_BASED_SHUFFLE_MERGED_LOCAL_BYTES_READ =
+    metricRegistry.counter(MetricRegistry.name("shuffleMergedLocalBytesRead"))
+  val METRIC_PUSH_BASED_SHUFFLE_MERGED_REMOTE_REQS_DURATION =

Review Comment:
   Nit: Remove `BASED`



##########
core/src/main/scala/org/apache/spark/status/api/v1/api.scala:
##########
@@ -436,7 +472,9 @@ class ShuffleReadMetricDistributions private[spark](
     val fetchWaitTime: IndexedSeq[Double],
     val remoteBytesRead: IndexedSeq[Double],
     val remoteBytesReadToDisk: IndexedSeq[Double],
-    val totalBlocksFetched: IndexedSeq[Double])
+    val totalBlocksFetched: IndexedSeq[Double],
+    val remoteReqsDuration: IndexedSeq[Double],
+    val pushBasedShuffle: ShufflePushReadMetricDistributions)

Review Comment:
   Nit: same here. rename to `shufflePushMetricsDist`



##########
core/src/main/scala/org/apache/spark/status/LiveEntity.scala:
##########
@@ -843,6 +917,25 @@ private[spark] object LiveEntityHelpers {
         m2.shuffleReadMetrics.remoteBytesReadToDisk * mult,
       m1.shuffleReadMetrics.localBytesRead + m2.shuffleReadMetrics.localBytesRead * mult,
       m1.shuffleReadMetrics.recordsRead + m2.shuffleReadMetrics.recordsRead * mult,
+      m1.shuffleReadMetrics.pushBasedShuffle.corruptMergedBlockChunks +
+        m2.shuffleReadMetrics.pushBasedShuffle.corruptMergedBlockChunks * mult,
+      m1.shuffleReadMetrics.pushBasedShuffle.mergedFetchFallbackCount +
+        m2.shuffleReadMetrics.pushBasedShuffle.mergedFetchFallbackCount * mult,
+      m1.shuffleReadMetrics.pushBasedShuffle.remoteMergedBlocksFetched +
+        m2.shuffleReadMetrics.pushBasedShuffle.remoteMergedBlocksFetched * mult,
+      m1.shuffleReadMetrics.pushBasedShuffle.localMergedBlocksFetched +
+        m2.shuffleReadMetrics.pushBasedShuffle.localMergedBlocksFetched * mult,
+      m1.shuffleReadMetrics.pushBasedShuffle.remoteMergedChunksFetched +
+        m2.shuffleReadMetrics.pushBasedShuffle.remoteMergedChunksFetched * mult,
+      m1.shuffleReadMetrics.pushBasedShuffle.localMergedChunksFetched +
+        m2.shuffleReadMetrics.pushBasedShuffle.localMergedChunksFetched * mult,
+      m1.shuffleReadMetrics.pushBasedShuffle.remoteMergedBytesRead +
+        m2.shuffleReadMetrics.pushBasedShuffle.remoteMergedBytesRead * mult,
+      m1.shuffleReadMetrics.pushBasedShuffle.localMergedBytesRead +
+        m2.shuffleReadMetrics.pushBasedShuffle.localMergedBytesRead * mult,
+      m1.shuffleReadMetrics.remoteReqsDuration + m2.shuffleReadMetrics.remoteReqsDuration * mult,
+      m1.shuffleReadMetrics.pushBasedShuffle.remoteMergedReqsDuration +
+        m2.shuffleReadMetrics.pushBasedShuffle.remoteMergedChunksFetched * mult,

Review Comment:
   This should be `remoteMergedReqsDuration` instead of `remoteMergedChunksFetched`



##########
core/src/main/scala/org/apache/spark/status/AppStatusStore.scala:
##########
@@ -405,11 +416,42 @@ private[spark] class AppStatusStore(
         },
         scanTasks(TaskIndexNames.SHUFFLE_TOTAL_BLOCKS) { m =>
           m.shuffleLocalBlocksFetched + m.shuffleRemoteBlocksFetched
-        }),
-      shuffleWriteMetrics = new v1.ShuffleWriteMetricDistributions(
-        scanTasks(TaskIndexNames.SHUFFLE_WRITE_SIZE) { t => t.shuffleBytesWritten },
-        scanTasks(TaskIndexNames.SHUFFLE_WRITE_RECORDS) { t => t.shuffleRecordsWritten },
-        scanTasks(TaskIndexNames.SHUFFLE_WRITE_TIME) { t => t.shuffleWriteTime }))
+        },
+        scanTasks(TaskIndexNames.SHUFFLE_REMOTE_REQS_DURATION) {
+          t => t.shuffleRemoteReqsDuration
+        },
+        new v1.ShufflePushReadMetricDistributions(
+          scanTasks(TaskIndexNames.PUSH_BASED_SHUFFLE_CORRUPT_MERGED_BLOCK_CHUNKS) { t =>
+            t.shuffleCorruptMergedBlockChunks
+          },
+          scanTasks(TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_FETCH_FALLBACK_COUNT) {
+            t => t.shuffleMergedFetchFallbackCount
+          },
+          scanTasks(TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_BLOCKS) { t =>
+            t.shuffleMergedRemoteBlocksFetched
+          },
+          scanTasks(TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_LOCAL_BLOCKS) { t =>
+            t.shuffleMergedLocalBlocksFetched
+          },
+          scanTasks(TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_CHUNKS) { t =>
+            t.shuffleMergedRemoteChunksFetched
+          },
+          scanTasks(TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_LOCAL_CHUNKS) { t =>
+            t.shuffleMergedLocalChunksFetched
+          },
+          scanTasks(TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_READS) { t =>
+            t.shuffleMergedRemoteBytesRead
+          },
+          scanTasks(TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_LOCAL_READS) { t =>
+            t.shuffleMergedLocalBytesRead
+          },
+          scanTasks(TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_REQS_DURATION) { t =>
+            t.shuffleMergedRemoteReqDuration
+          })),

Review Comment:
   Nit: Remove `BASED` from TaskIndexesNames



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org