You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/09/30 07:05:36 UTC

[GitHub] [spark] zhouyejoe opened a new pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

zhouyejoe opened a new pull request #34156:
URL: https://github.com/apache/spark/pull/34156


   We found an issue where user configured both AQE and push based shuffle, but the job started to hang after running some  stages. We took the thread dump from the Executors, which showed the task is still waiting to fetch shuffle blocks.
   Proposed changes in the PR to fix the issue.
   
   ### What changes were proposed in this pull request?
   Disabled Batch fetch when push based shuffle is enabled. 
   
   ### Why are the changes needed?
   Without this patch, enabling AQE and Push based shuffle will have a chance to hang the tasks.
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   Tested the PR within our PR, with Spark shell and the queries are:
   
   sql("""SELECT CASE WHEN rand() < 0.8 THEN 100 ELSE CAST(rand() * 30000000 AS INT) END AS s_item_id, CAST(rand() * 100 AS INT) AS s_quantity, DATE_ADD(current_date(), - CAST(rand() * 360 AS INT)) AS s_date FROM RANGE(1000000000)""").createOrReplaceTempView("sales")
   // Dynamically coalesce partitions
   sql("""SELECT s_date, sum(s_quantity) AS q FROM sales GROUP BY s_date ORDER BY q DESC""").collect
   
   Unit tests to be added.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935514467


   **[Test build #143871 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143871/testReport)** for PR 34156 at commit [`9d34e03`](https://github.com/apache/spark/commit/9d34e035828ccbb09061ee5f12a913eef1f07f10).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang closed pull request #34156: [SPARK-36892][Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
gengliangwang closed pull request #34156:
URL: https://github.com/apache/spark/pull/34156


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935370054


   **[Test build #143864 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143864/testReport)** for PR 34156 at commit [`37f77b7`](https://github.com/apache/spark/commit/37f77b7e4a5c01dcd4a3d61ac41558e1b14a8fd5).
    * This patch **fails Spark unit tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhouyejoe commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
zhouyejoe commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r721791399



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1447,8 +1451,9 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: SPARK-35036: Instead of reading map blocks in case of AQE with Push based shuffle,
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
-    if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+    if (mergeStatuses.exists(_.nonEmpty) && mergeStatuses.exists(_.exists(_ != null))

Review comment:
       Thanks. I do have updated the PR to exclude this check.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720675561



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -519,17 +521,19 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
    * but endMapIndex is excluded). If endMapIndex=Int.MaxValue, the actual endMapIndex will be
    * changed to the length of total map outputs.
    *
-   * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
-   *         and the second item is a sequence of (shuffle block id, shuffle block size, map index)
-   *         tuples describing the shuffle blocks that are stored at that block manager.
-   *         Note that zero-sized blocks are excluded in the result.
+   * @return A case class object which includes two attributes. The first attribute is a sequence
+   *         of 2-item tuples, where the first item in the tuple is a BlockManagerId, and the
+   *         second item is a sequence of (shuffle block id, shuffle block size, map index) tuples
+   *         tuples describing the shuffle blocks that are stored at that block manager. Note that
+   *         zero-sized blocks are excluded in the result. The second attribute is a boolean flag,
+   *         indicating whether batch fetch can be enabled.
    */
   def getMapSizesByExecutorId(
       shuffleId: Int,
       startMapIndex: Int,
       endMapIndex: Int,
       startPartition: Int,
-      endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
+      endPartition: Int): MapSizesByExecutorId

Review comment:
       @Ngone51 We would be trying to infer behavior in this case based on the response, instead of being explicit about what should be enabled/disabled (btw, `blocksByAddress` is iterator, so we would need a `.toList` or some such as well).
   
   The implication of that is, for example, without the `mergeStatuses.exists(_.exists(_ != null))` in `convertMapStatuses`, `hasMergedBlock` will be false - and so we would enable batch fetch - and fail.
   I want to avoid tight coupling based on pattern of blocks in response, and instead explicitly let the response indicate whether we should enable/disable feature. 
   This should allow us to reasonably independently evolve both features in future as well.
   
   Thoughts ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932710472


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48311/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932728892


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/143799/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932897628


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/143809/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719637656



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       I believe this should suffice:
   ```
       if (mergeStatuses.exists(_.nonEmpty) && mergeStatuses.exists(_.exists(_ != null))
         && startMapIndex == 0
         && endMapIndex == mapStatuses.length) {
   ```
   
   and we can keep the changes to `BlockStoreShuffleReader`.
   Thoughts ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720677576



##########
File path: core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
##########
@@ -129,11 +129,20 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
       endPartition: Int,
       context: TaskContext,
       metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
-    val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
-      handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
+    val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]]
+    val (blocksByAddress, enableBatchFetch) =
+      if (baseShuffleHandle.dependency.shuffleMergeEnabled) {
+        val res = SparkEnv.get.mapOutputTracker.getPushBasedShuffleMapSizesByExecutorId(
+          handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
+        (res.iter, res.enableBatchFetch)
+      } else {
+        val address = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
+          handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
+        (address, canUseBatchFetch(startPartition, endPartition, context))
+      }
     new BlockStoreShuffleReader(
       handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
-      shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context))
+      shouldBatchFetch = enableBatchFetch)
   }

Review comment:
       nit: Cleaner would be:
   
   * rename `enableBatchFetch` to `canEnableBatchFetch`.
   * Set to `true` in else block.
   * here, do: `shouldBatchFetch = canEnableBatchFetch && canUseBatchFetch(startPartition, endPartition, context)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720615835



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -519,17 +521,19 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
    * but endMapIndex is excluded). If endMapIndex=Int.MaxValue, the actual endMapIndex will be
    * changed to the length of total map outputs.
    *
-   * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
-   *         and the second item is a sequence of (shuffle block id, shuffle block size, map index)
-   *         tuples describing the shuffle blocks that are stored at that block manager.
-   *         Note that zero-sized blocks are excluded in the result.
+   * @return A case class object which includes two attributes. The first attribute is a sequence
+   *         of 2-item tuples, where the first item in the tuple is a BlockManagerId, and the
+   *         second item is a sequence of (shuffle block id, shuffle block size, map index) tuples
+   *         tuples describing the shuffle blocks that are stored at that block manager. Note that
+   *         zero-sized blocks are excluded in the result. The second attribute is a boolean flag,
+   *         indicating whether batch fetch can be enabled.
    */
   def getMapSizesByExecutorId(
       shuffleId: Int,
       startMapIndex: Int,
       endMapIndex: Int,
       startPartition: Int,
-      endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
+      endPartition: Int): MapSizesByExecutorId

Review comment:
       Yeah, but as you may know, it's very common that Spark private APIs are abused as public APIs. We did break downstream use cases by changing private APIs, e.g., SQL UDF stuff.  And I know some users implement their own shuffle module by extending `ShuffleManager`,  `MapOutputTracker`, etc. Given that we're very close to the 3.2 release, I think it's not appropriate to have this kind of change at this moment. 
   
   If you guys still insist on  this way, I'd suggest to totally separate the push-based shuffle path with the normal shuffle path from `SortShuffleManager.getReader`, e.g.,
   
   ```scala
     override def getReader[K, C](
         handle: ShuffleHandle,
         startMapIndex: Int,
         endMapIndex: Int,
         startPartition: Int,
         endPartition: Int,
         context: TaskContext,
         metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
       val (blocksByAddress, enableBatchFetch ) = if (fetchMergeResult) {
         val res = SparkEnv.get.mapOutputTracker.getPushBasedShuffleMapSizesByExecutorId(...)
         (res.iter, res.enableBatchFetch)
       } else {
         val address = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(...)
         (address, canUseBatchFetch(startPartition, endPartition, context))
       }
       val blocksByAddress = 
       new BlockStoreShuffleReader(
         handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress., context, metrics,
         shouldBatchFetch = enableBatchFetch)
             
     }
   
   ```
     




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719576838



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       > As demonstrated in this PR inside BlockStoreShuffleReader, we can disable batch fetch when push-based shuffle is enabled and it's an equally simple fix.
   
   Ok, so you mean to disable batch fetch for all the cases when PBS enabled rather than the partition coalesce case only. Then, of course, the change is also simple.
   
   
   But in this way, people may hit perf issue for the case like `PartialMapperPartitionSpec` unless you can guarantee that the improved perf of PBS is bigger than batch fetch in any cases.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-931105600


   ok to test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-931160777






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-931774843


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/143772/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719531296



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       > My concern with only keeping this condition (endPartition - startPartition == 1) instead of disable batch fetch is that it will effectively disable push-based shuffle in 3.2.0 even if the push.enabled flag is set to true.
   
   ~~IUC, push-based shuffle currently can only work with one reduce partition, no? ~~ And I didn't see `endPartition - startPartition == 1` disabled push-based shuffle (PBS) for any cases except "partition coalesce".
   
   To clarify, there's no other cases would have multiple reduce partition if PBS wants to take effect due to the limit of  `startMapIndex == 0 && endMapIndex == mapStatuses.length`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-931226388


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48276/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932897425


   **[Test build #143809 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143809/testReport)** for PR 34156 at commit [`b17fe5a`](https://github.com/apache/spark/commit/b17fe5ab0e13d1f6d29ae99d31970904d98c55a2).
    * This patch **fails Spark unit tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719576838



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       > As demonstrated in this PR inside BlockStoreShuffleReader, we can disable batch fetch when push-based shuffle is enabled and it's an equally simple fix.
   
   Ok, so you mean to disable batch fetch for all the cases when push-based shuffle rather than the partition coalesce case only. Then, of course, the change is also simple.
   
   
   But in this way, people may hit perf issue for the case like `PartialMapperPartitionSpec` unless you can guarantee that the improved perf of PBS is bigger than batch fetch in any cases.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-931768419


   **[Test build #143772 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143772/testReport)** for PR 34156 at commit [`bce8c46`](https://github.com/apache/spark/commit/bce8c460f068d596d62f656d83e45bc27592443d).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932657417


   **[Test build #143798 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143798/testReport)** for PR 34156 at commit [`2a7a658`](https://github.com/apache/spark/commit/2a7a658b5272752b845babe20136e99498bec136).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Victsm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Victsm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719496790



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       Push-based shuffle in practice shouldn't interfere with partition coalesce.
   A reducer should be able to fetch from multiple shuffle partitions as separate merged shuffle partitions, and that's the current behavior implemented in this code.
   The only thing that prevents it from working properly is that `ShuffleBlockFetcherIterator` later on tries to merge continuous blocks generated in this case for batch fetch, which is totally unnecessary.
   In 3.2.0, AQE, partition coalesce, and batch fetch are all default to true.
   My concern with this condition is that it will effectively disable push-based shuffle in 3.2.0 even if the push.enabled flag is set to true.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719576838



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       > As demonstrated in this PR inside BlockStoreShuffleReader, we can disable batch fetch when push-based shuffle is enabled and it's an equally simple fix.
   
   Ok, so you mean to disable batch fetch for all the cases when PBS enabled rather than the partition coalesce case only. Then, of course, the change is also simple.
   
   
   But in this way, people may hit perf issue for the case like `PartialMapperPartitionSpec` unless you can guarantee that the improved perf of PBS is better than batch fetch in any cases.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720675561



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -519,17 +521,19 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
    * but endMapIndex is excluded). If endMapIndex=Int.MaxValue, the actual endMapIndex will be
    * changed to the length of total map outputs.
    *
-   * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
-   *         and the second item is a sequence of (shuffle block id, shuffle block size, map index)
-   *         tuples describing the shuffle blocks that are stored at that block manager.
-   *         Note that zero-sized blocks are excluded in the result.
+   * @return A case class object which includes two attributes. The first attribute is a sequence
+   *         of 2-item tuples, where the first item in the tuple is a BlockManagerId, and the
+   *         second item is a sequence of (shuffle block id, shuffle block size, map index) tuples
+   *         tuples describing the shuffle blocks that are stored at that block manager. Note that
+   *         zero-sized blocks are excluded in the result. The second attribute is a boolean flag,
+   *         indicating whether batch fetch can be enabled.
    */
   def getMapSizesByExecutorId(
       shuffleId: Int,
       startMapIndex: Int,
       endMapIndex: Int,
       startPartition: Int,
-      endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
+      endPartition: Int): MapSizesByExecutorId

Review comment:
       @Ngone51 We would be trying to infer behavior in this case based on the response, instead of being explicit about what should be enabled/disabled (btw, `blocksByAddress` is iterator, so we would need a `.toList` or some such as well).
   
   The implication of that is, for example, without the `mergeStatuses.exists(_.exists(_ != null))` in `convertMapStatuses`, `hasMergedBlock` will be false - and so we would enable batch fetch - and fail.
   I want to avoid tight coupling based on pattern of blocks in response, and instead explicitly let the response indicate whether we should enable/disable feature. 
   This should allow us to reasonably independently evolve both features in future as well (for example, if push based shuffle starts supporting fetching subset of mapper outputs for some conditions based on some constraints in `MergeStatus`)
   
   Thoughts ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-931626310


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48283/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34156: [SPARK-36892][Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935514467


   **[Test build #143871 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143871/testReport)** for PR 34156 at commit [`9d34e03`](https://github.com/apache/spark/commit/9d34e035828ccbb09061ee5f12a913eef1f07f10).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on pull request #34156: [SPARK-36892][Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935626206


   I think we can have the new RC after this one is merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935305619


   **[Test build #143866 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143866/testReport)** for PR 34156 at commit [`994a0a1`](https://github.com/apache/spark/commit/994a0a107f493c69f9abbd68a3f2329455cbe99e).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on pull request #34156: [SPARK-36892][Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935669445


   Thanks, merging to master/3.2


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932892588


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48321/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719450502



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       ~~Not push-based shuffle, but batch fetch is still doable, right?~~
   
   So if "both conditions" refers to "endPartition - startPartition == 1" & disable batch fetch, I'd say we'd end up not using batch fetch for coalasced partitions. And in anyways, push-based shuffle should not be used for coalasced partitions as it could contain multiple reduce partitions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-931231400


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/48276/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932886498


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48321/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932721059


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48311/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719531296



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       > My concern with only keeping this condition (endPartition - startPartition == 1) instead of disable batch fetch is that it will effectively disable push-based shuffle in 3.2.0 even if the push.enabled flag is set to true.
   
   ~~IUC, push-based shuffle currently can only work with one reduce partition, no?~~ And I didn't see `endPartition - startPartition == 1` disabled push-based shuffle (PBS) for any cases except "partition coalesce".
   
   To clarify, there's no other cases with multiple reduce partition that could apply to PBS due to the limit of  `startMapIndex == 0 && endMapIndex == mapStatuses.length`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932679441


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/143798/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720612800



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1447,8 +1451,9 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: SPARK-35036: Instead of reading map blocks in case of AQE with Push based shuffle,
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
-    if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+    if (mergeStatuses.exists(_.nonEmpty) && mergeStatuses.exists(_.exists(_ != null))

Review comment:
       does this check still makes sense `mergeStatuses.exists(_.nonEmpty)` as it will always be `Some(Array())` when push-based shuffle flag is set to true only will be `None` in the case when push-based shuffle flag is set to false? As in which is anyway covered with the new check `mergeStatuses.exists(_.exists(_ != null)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720613555



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -519,17 +521,19 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
    * but endMapIndex is excluded). If endMapIndex=Int.MaxValue, the actual endMapIndex will be
    * changed to the length of total map outputs.
    *
-   * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
-   *         and the second item is a sequence of (shuffle block id, shuffle block size, map index)
-   *         tuples describing the shuffle blocks that are stored at that block manager.
-   *         Note that zero-sized blocks are excluded in the result.
+   * @return A case class object which includes two attributes. The first attribute is a sequence
+   *         of 2-item tuples, where the first item in the tuple is a BlockManagerId, and the
+   *         second item is a sequence of (shuffle block id, shuffle block size, map index) tuples
+   *         tuples describing the shuffle blocks that are stored at that block manager. Note that
+   *         zero-sized blocks are excluded in the result. The second attribute is a boolean flag,
+   *         indicating whether batch fetch can be enabled.
    */
   def getMapSizesByExecutorId(
       shuffleId: Int,
       startMapIndex: Int,
       endMapIndex: Int,
       startPartition: Int,
-      endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
+      endPartition: Int): MapSizesByExecutorId

Review comment:
       `MapOutputTracker` is `private[spark]` and not part of exposed API.
   Am I missing something here ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719576838



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       > As demonstrated in this PR inside BlockStoreShuffleReader, we can disable batch fetch when push-based shuffle is enabled and it's an equally simple fix.
   
   Ok, so you mean to disable batch fetch for all the cases when PBS enabled rather than the partition coalesce case only. Then, of course, the change is also simple.
   
   
   But in this way, people may hit perf issue for the case like `PartialMapperPartitionSpec` (which is applied to batch fetch but not PBS) unless you can guarantee that the improved perf of PBS is better than batch fetch in any cases.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720686921



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -519,17 +521,19 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
    * but endMapIndex is excluded). If endMapIndex=Int.MaxValue, the actual endMapIndex will be
    * changed to the length of total map outputs.
    *
-   * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
-   *         and the second item is a sequence of (shuffle block id, shuffle block size, map index)
-   *         tuples describing the shuffle blocks that are stored at that block manager.
-   *         Note that zero-sized blocks are excluded in the result.
+   * @return A case class object which includes two attributes. The first attribute is a sequence
+   *         of 2-item tuples, where the first item in the tuple is a BlockManagerId, and the
+   *         second item is a sequence of (shuffle block id, shuffle block size, map index) tuples
+   *         tuples describing the shuffle blocks that are stored at that block manager. Note that
+   *         zero-sized blocks are excluded in the result. The second attribute is a boolean flag,
+   *         indicating whether batch fetch can be enabled.
    */
   def getMapSizesByExecutorId(
       shuffleId: Int,
       startMapIndex: Int,
       endMapIndex: Int,
       startPartition: Int,
-      endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
+      endPartition: Int): MapSizesByExecutorId

Review comment:
       Sounds good.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935444522


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/143866/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935438470


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/48379/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719669365



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       To elaborate:
   
   * If push based shuffle is enabled for a stage we want to disable block fetch.
     * The check in `BlockStoreShuffleReader` does this.
   * If block fetch is enabled, we want to make sure it always goes through the `else` condition here.
     * block fetch assumes ordering of id's which get violated in the `if` block here.
   
   
   For background, `mergeStatuses != null` and `mergeStatuses.length == numReducers` when `Utils.isPushBasedShuffleEnabled == true` and `mergeStatuses = Array.empty` (or `null` depending on codepath) when it is disabled.
   
   For the case of `Utils.isPushBasedShuffleEnabled == true` and `BlockStoreShuffleReader.shouldBatchFetch == true`, we have following cases:
   
   * `shuffleStage.shuffleMergeEnabled == false`
     * `mergeStatuses` has only nulls.
     * With current PR, `doBatchFetch` == true, but we take the `if` condition in `convertMapStatuses` - which results in incorrect ordering of blocks (from point of. view of batch fetch).
       * This is the root cause of the hang.
     * With the proposed change above, we will take else block.
       *  Push based shuffle is not relevant for task anyway, and now block fetch will work fine.
     * Additional change - we always take `else` path even when `Utils.isPushBasedShuffleEnabled == true`.
       * This should be functionally equivalent.
       * Now we can potentially benefit from block fetch (when push based shuffle does not kick in - stage retry for example) - so better behavior; but more importantly, should not cause issues.
   * `shuffleStage.shuffleMergeEnabled == true`
     * With this PR, block fetch disabled.
     * With push based shuffle having merged blocks, it takes the `if` block - else it takes `else` block (for direct fetch of unmerged blocks).
       * No behavior change : other than disabling block fetch.
       * This prevents duplicate data fetch issue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932669451


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48310/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932657417


   **[Test build #143798 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143798/testReport)** for PR 34156 at commit [`2a7a658`](https://github.com/apache/spark/commit/2a7a658b5272752b845babe20136e99498bec136).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719576838



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       > As demonstrated in this PR inside BlockStoreShuffleReader, we can disable batch fetch when push-based shuffle is enabled and it's an equally simple fix.
   
   Ok, so you mean to disable batch fetch for all the cases when push-based shuffle rather than the partition coalesce case only. Then, of course, the change is also simple.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932673569


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/48310/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935233217


   **[Test build #143864 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143864/testReport)** for PR 34156 at commit [`37f77b7`](https://github.com/apache/spark/commit/37f77b7e4a5c01dcd4a3d61ac41558e1b14a8fd5).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-931186533


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48276/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932673569


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/48310/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720616373



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -519,17 +521,19 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
    * but endMapIndex is excluded). If endMapIndex=Int.MaxValue, the actual endMapIndex will be
    * changed to the length of total map outputs.
    *
-   * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
-   *         and the second item is a sequence of (shuffle block id, shuffle block size, map index)
-   *         tuples describing the shuffle blocks that are stored at that block manager.
-   *         Note that zero-sized blocks are excluded in the result.
+   * @return A case class object which includes two attributes. The first attribute is a sequence
+   *         of 2-item tuples, where the first item in the tuple is a BlockManagerId, and the
+   *         second item is a sequence of (shuffle block id, shuffle block size, map index) tuples
+   *         tuples describing the shuffle blocks that are stored at that block manager. Note that
+   *         zero-sized blocks are excluded in the result. The second attribute is a boolean flag,
+   *         indicating whether batch fetch can be enabled.
    */
   def getMapSizesByExecutorId(
       shuffleId: Int,
       startMapIndex: Int,
       endMapIndex: Int,
       startPartition: Int,
-      endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
+      endPartition: Int): MapSizesByExecutorId

Review comment:
       Given how close we are to RC, I am fine with this approach.
   
   In general though, we should be very careful about setting expectation that there would be compatibility guarantees with `private[spark]` classes; they are explicitly marked that way to make it very clear not to depend on them. Inspite of that, if there are projects/users depending on it, it is up to them to ensure compatibility - not spark project.
   
   @zhouyejoe Can you evaluate the change that @Ngone51 proposed please ? Internally, it could delegate to the same `getMapSizesByExecutorIdImpl` (which would be what you have here) and simply return response.iter for existing method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935233217


   **[Test build #143864 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143864/testReport)** for PR 34156 at commit [`37f77b7`](https://github.com/apache/spark/commit/37f77b7e4a5c01dcd4a3d61ac41558e1b14a8fd5).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhouyejoe commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
zhouyejoe commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720603818



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       Updated the PR, please help review. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719438161



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       I agree - this condition should be sufficient.
   With both conditions, we will end up not using batch fetch or push based shuffle for coalasced partitions, right ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Victsm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Victsm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720265235



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       We don’t need to touch ShuffleManager API. 
   In fact, we don’t need to touch any API at all after some more internal discussions. 
   @zhouyejoe will update the PR after some more tests to show the change. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720619399



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -519,17 +521,19 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
    * but endMapIndex is excluded). If endMapIndex=Int.MaxValue, the actual endMapIndex will be
    * changed to the length of total map outputs.
    *
-   * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
-   *         and the second item is a sequence of (shuffle block id, shuffle block size, map index)
-   *         tuples describing the shuffle blocks that are stored at that block manager.
-   *         Note that zero-sized blocks are excluded in the result.
+   * @return A case class object which includes two attributes. The first attribute is a sequence
+   *         of 2-item tuples, where the first item in the tuple is a BlockManagerId, and the
+   *         second item is a sequence of (shuffle block id, shuffle block size, map index) tuples
+   *         tuples describing the shuffle blocks that are stored at that block manager. Note that
+   *         zero-sized blocks are excluded in the result. The second attribute is a boolean flag,
+   *         indicating whether batch fetch can be enabled.
    */
   def getMapSizesByExecutorId(
       shuffleId: Int,
       startMapIndex: Int,
       endMapIndex: Int,
       startPartition: Int,
-      endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
+      endPartition: Int): MapSizesByExecutorId

Review comment:
       @zhouyejoe btw, could you also update the doc of `spark.shuffle.push.enabled` to reflect the change made in this PR, e.g., "push-based shuffle takes priority over batch fetch for the scenario like partition coalesce."




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #34156: [SPARK-36892][Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935548430


   @gengliangwang or @Ngone51, can you please merge to master/branch-3.2 if/when then tests pass ? The PR looks fine to me. Thx !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719450502



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       ~~Not push-based shuffle, but batch fetch is still doable, right?~~
   
   So if "both conditions" refers to "endPartition - startPartition == 1" & "disable batch fetch", I'd say we'd end up not using batch fetch for coalasced partitions. And in anyways, push-based shuffle should not be used for coalasced partitions as it could contain multiple reduce partitions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-930914116


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935442753


   **[Test build #143866 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143866/testReport)** for PR 34156 at commit [`994a0a1`](https://github.com/apache/spark/commit/994a0a107f493c69f9abbd68a3f2329455cbe99e).
    * This patch **fails Spark unit tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhouyejoe commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
zhouyejoe commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935486253


   > The test failure is relevant @zhouyejoe. Can you add `conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")` to all tests which set `conf.set(PUSH_BASED_SHUFFLE_ENABLED, true)` in this Suite ? (Or set it to `conf` after it is created). Looks like something which was probably missed out from Minchu's patch.
   > 
   > The root cause here is that we are modifying the global `conf` - which should not have been done in the original PR, and was probably missed in the reviews. We have to fix it in a follow up jira.
   
   Updated. Thanks. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r722826200



##########
File path: core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
##########
@@ -734,4 +738,112 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
       tracker.stop()
     }
   }
+
+  test("SPARK-36892: Batch fetch should be enabled in some scenarios with push based shuffle") {
+    conf.set(PUSH_BASED_SHUFFLE_ENABLED, true)
+    conf.set(IS_TESTING, true)

Review comment:
       Can you rebase to latest master ? (to include Minchu's patch ?)
   Specifically, we need `conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")` also here to enable push based shuffle.
   (Same for other tests below as well).

##########
File path: docs/configuration.md
##########
@@ -3166,7 +3166,7 @@ See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this fe
 
 # Push-based shuffle overview
 
-Push-based shuffle helps improve the reliability and performance of spark shuffle. It takes a best-effort approach to push the shuffle blocks generated by the map tasks to remote external shuffle services to be merged per shuffle partition. Reduce tasks fetch a combination of merged shuffle partitions and original shuffle blocks as their input data, resulting in converting small random disk reads by external shuffle services into large sequential reads. Possibility of better data locality for reduce tasks additionally helps minimize network IO.
+Push-based shuffle helps improve the reliability and performance of spark shuffle. It takes a best-effort approach to push the shuffle blocks generated by the map tasks to remote external shuffle services to be merged per shuffle partition. Reduce tasks fetch a combination of merged shuffle partitions and original shuffle blocks as their input data, resulting in converting small random disk reads by external shuffle services into large sequential reads. Possibility of better data locality for reduce tasks additionally helps minimize network IO. Push-based shuffle takes priority over batch fetch for some scenarios, like partition coalesce.

Review comment:
       `like partition coalesce when merged output is available` 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932068453


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719450502



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       Not push-based shuffle, but batch fetch is still doable, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Victsm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Victsm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719538616



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       Push based shuffle can work with multiple partitions per reducer. 
   What it cannot work with is when one shuffle partition gets split between multiple reducers in the case of skew handling. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932897628


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/143809/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719531296



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       > My concern with only keeping this condition (endPartition - startPartition == 1) instead of disable batch fetch is that it will effectively disable push-based shuffle in 3.2.0 even if the push.enabled flag is set to true.
   
   ~~IUC, push-based shuffle currently can only work with one reduce partition, no?~~ And I didn't see `endPartition - startPartition == 1` disabled push-based shuffle (PBS) for any cases except "partition coalesce".
   
   To clarify, there's no other cases would have multiple reduce partition if PBS wants to take effect due to the limit of  `startMapIndex == 0 && endMapIndex == mapStatuses.length`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932676543


   **[Test build #143798 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143798/testReport)** for PR 34156 at commit [`2a7a658`](https://github.com/apache/spark/commit/2a7a658b5272752b845babe20136e99498bec136).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932727334


   **[Test build #143799 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143799/testReport)** for PR 34156 at commit [`9fe6d85`](https://github.com/apache/spark/commit/9fe6d8561c7cbf95837ba873718be83dc5eb083e).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720708492



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1447,8 +1451,9 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: SPARK-35036: Instead of reading map blocks in case of AQE with Push based shuffle,
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
-    if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+    if (mergeStatuses.exists(_.nonEmpty) && mergeStatuses.exists(_.exists(_ != null))

Review comment:
       Ideally we can avoid doing this costly check by checking `numAvailableMergeResults` and setting this `mergeStatuses` to `None` before passing to `convertMapStatuses`. But may be we can do that as a follow-up as a separate PR .




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhouyejoe commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
zhouyejoe commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-930891292


   cc @mridulm @Ngone51 @venkata91 @gengliangwang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720642425



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -519,17 +521,19 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
    * but endMapIndex is excluded). If endMapIndex=Int.MaxValue, the actual endMapIndex will be
    * changed to the length of total map outputs.
    *
-   * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
-   *         and the second item is a sequence of (shuffle block id, shuffle block size, map index)
-   *         tuples describing the shuffle blocks that are stored at that block manager.
-   *         Note that zero-sized blocks are excluded in the result.
+   * @return A case class object which includes two attributes. The first attribute is a sequence
+   *         of 2-item tuples, where the first item in the tuple is a BlockManagerId, and the
+   *         second item is a sequence of (shuffle block id, shuffle block size, map index) tuples
+   *         tuples describing the shuffle blocks that are stored at that block manager. Note that
+   *         zero-sized blocks are excluded in the result. The second attribute is a boolean flag,
+   *         indicating whether batch fetch can be enabled.
    */
   def getMapSizesByExecutorId(
       shuffleId: Int,
       startMapIndex: Int,
       endMapIndex: Int,
       startPartition: Int,
-      endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
+      endPartition: Int): MapSizesByExecutorId

Review comment:
       Maybe, an equivalent way to do the same thing by `MapSizesByExecutorId` is:
   
   ```scala
   override def getReader[K, C](
         handle: ShuffleHandle,
         startMapIndex: Int,
         endMapIndex: Int,
         startPartition: Int,
         endPartition: Int,
         context: TaskContext,
         metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
       val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
         handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
      val hasMergedBlock = blocksByAddress.exists(_._1.host == SHUFFLE_MERGER_IDENTIFIER)
       new BlockStoreShuffleReader(
         handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
         shouldBatchFetch =  !hasMergedBlock && canUseBatchFetch(startPartition, endPartition, context))
     }
   ```
   which looks simpler.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720612667



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1447,8 +1451,9 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: SPARK-35036: Instead of reading map blocks in case of AQE with Push based shuffle,
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
-    if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+    if (mergeStatuses.exists(_.nonEmpty) && mergeStatuses.exists(_.exists(_ != null))
+      && startMapIndex == 0 && endMapIndex == mapStatuses.length) {
+      enableBatchFetch = false

Review comment:
       Would it make sense to add a log line here? Would be useful for debugging. If not `INFO` atleast `DEBUG`

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1447,8 +1451,9 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: SPARK-35036: Instead of reading map blocks in case of AQE with Push based shuffle,
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
-    if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+    if (mergeStatuses.exists(_.nonEmpty) && mergeStatuses.exists(_.exists(_ != null))

Review comment:
       does this check still makes sense `mergeStatuses.exists(_.nonEmpty)` as it will always be `Some(Array())` when push-based shuffle flag is set to true only will be `None` in the case when push-based shuffle flag is set to false




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935371671


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/143864/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhouyejoe commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
zhouyejoe commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935213349


   Added unit tests. Also updated the documentation for push based shuffle.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on pull request #34156: [SPARK-36892][Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935625332


   @mridulm Sure. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhouyejoe commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
zhouyejoe commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r722832278



##########
File path: core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
##########
@@ -734,4 +738,112 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
       tracker.stop()
     }
   }
+
+  test("SPARK-36892: Batch fetch should be enabled in some scenarios with push based shuffle") {
+    conf.set(PUSH_BASED_SHUFFLE_ENABLED, true)
+    conf.set(IS_TESTING, true)

Review comment:
       Updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935417943


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48379/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719450502



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       ~~Not push-based shuffle, but batch fetch is still doable, right?~~
   
   So if "both conditions" refers to "endPartition - startPartition == 1" & "disable batch fetch", I'd say we'd end up not using batch fetch for coalasced partitions(but this isn't expected). And in anyways, push-based shuffle should not be used for coalasced partitions as it could contain multiple reduce partitions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhouyejoe commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
zhouyejoe commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720604845



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       UT to be added




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-930914116


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-931253780


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/143765/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719531296



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       > My concern with only keeping this condition (endPartition - startPartition == 1) instead of disable batch fetch is that it will effectively disable push-based shuffle in 3.2.0 even if the push.enabled flag is set to true.
   
   IIUC, push-based shuffle currently can only work with one reduce partition, no? And I didn't see `endPartition - startPartition == 1` disabled push-based shuffle for any cases except "partition coalesce".




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720613387



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -519,17 +521,19 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
    * but endMapIndex is excluded). If endMapIndex=Int.MaxValue, the actual endMapIndex will be
    * changed to the length of total map outputs.
    *
-   * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
-   *         and the second item is a sequence of (shuffle block id, shuffle block size, map index)
-   *         tuples describing the shuffle blocks that are stored at that block manager.
-   *         Note that zero-sized blocks are excluded in the result.
+   * @return A case class object which includes two attributes. The first attribute is a sequence
+   *         of 2-item tuples, where the first item in the tuple is a BlockManagerId, and the
+   *         second item is a sequence of (shuffle block id, shuffle block size, map index) tuples
+   *         tuples describing the shuffle blocks that are stored at that block manager. Note that
+   *         zero-sized blocks are excluded in the result. The second attribute is a boolean flag,
+   *         indicating whether batch fetch can be enabled.
    */
   def getMapSizesByExecutorId(
       shuffleId: Int,
       startMapIndex: Int,
       endMapIndex: Int,
       startPartition: Int,
-      endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
+      endPartition: Int): MapSizesByExecutorId

Review comment:
       This changed the return type, which's supposed to be a part of API. ShuffleManager relies on this could still be broken.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720677348



##########
File path: core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
##########
@@ -129,11 +129,20 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
       endPartition: Int,
       context: TaskContext,
       metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
-    val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
-      handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
+    val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]]
+    val (blocksByAddress, enableBatchFetch) =
+      if (baseShuffleHandle.dependency.shuffleMergeEnabled) {
+        val res = SparkEnv.get.mapOutputTracker.getPushBasedShuffleMapSizesByExecutorId(
+          handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
+        (res.iter, res.enableBatchFetch)

Review comment:
       `res.enableBatchFetch && canUseBatchFetch(startPartition, endPartition, context)` (pls see below as well for alternative)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Victsm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Victsm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719571404



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       > For this way, I'm afraid we have to change the interface ShuffleManager.getReader, which could involve more changes.
   I don't think changing the interface to `ShuffleManager.getReader` is needed.
   As demonstrated in this PR inside `BlockStoreShuffleReader`, we can disable batch fetch when push-based shuffle is enabled and it's an equally simple fix.
   
   The default configs in 3.2.0 will initiate partition coalesce if the size per shuffle partition is less than 1MB, and people might want to enable partition coalesce to fix the many small files problems.
   These valid reasons where partition coalesce is preferred to be enabled might make it even trickier to reason with when push-based shuffle actually gets enabled.
   
   Block batching and push-based shuffle could potentially work together, i.e. we could batch blocks together to be pushed to create merged shuffle files that span across multiple shuffle partitions and enable push-based shuffle and batch fetching to properly work together.
   However, for the sake of unblocking 3.2.0 release, we need a much quicker resolution to this issue.
   For that, when the flags for AQE, partition coalesce, batch fetch, and push-based shuffle are all set to true, we will have to either disable push-based shuffle and leave batch fetch as is with partition coalesce, or we can disable batch fetch and leave push-based shuffle as is with partition coalesce.
   I prefer the 2nd option, because it makes it more consistent for enabling the flag of push-based shuffle and push-based shuffle is a more thorough solution for the problem that batch fetch is trying to solve.
   

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       > For this way, I'm afraid we have to change the interface ShuffleManager.getReader, which could involve more changes.
   
   I don't think changing the interface to `ShuffleManager.getReader` is needed.
   As demonstrated in this PR inside `BlockStoreShuffleReader`, we can disable batch fetch when push-based shuffle is enabled and it's an equally simple fix.
   
   The default configs in 3.2.0 will initiate partition coalesce if the size per shuffle partition is less than 1MB, and people might want to enable partition coalesce to fix the many small files problems.
   These valid reasons where partition coalesce is preferred to be enabled might make it even trickier to reason with when push-based shuffle actually gets enabled.
   
   Block batching and push-based shuffle could potentially work together, i.e. we could batch blocks together to be pushed to create merged shuffle files that span across multiple shuffle partitions and enable push-based shuffle and batch fetching to properly work together.
   However, for the sake of unblocking 3.2.0 release, we need a much quicker resolution to this issue.
   For that, when the flags for AQE, partition coalesce, batch fetch, and push-based shuffle are all set to true, we will have to either disable push-based shuffle and leave batch fetch as is with partition coalesce, or we can disable batch fetch and leave push-based shuffle as is with partition coalesce.
   I prefer the 2nd option, because it makes it more consistent for enabling the flag of push-based shuffle and push-based shuffle is a more thorough solution for the problem that batch fetch is trying to solve.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719119279



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       I think this enhanced condition is enough to fix the issue. Why do we still have to disable the batch fetch?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935390374


   The test failure is relevant @zhouyejoe.
   Can you add `conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")` to all tests which set `conf.set(PUSH_BASED_SHUFFLE_ENABLED, true)` in this Suite ?
   Looks like something which was probably missed out from Minchu's patch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Victsm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Victsm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719503545



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       Why not keep the behavior of push-based shuffle and disable the behavior of batch fetch when partition coalesce happens?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932721857


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/48311/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720677576



##########
File path: core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
##########
@@ -129,11 +129,20 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
       endPartition: Int,
       context: TaskContext,
       metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
-    val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
-      handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
+    val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]]
+    val (blocksByAddress, enableBatchFetch) =
+      if (baseShuffleHandle.dependency.shuffleMergeEnabled) {
+        val res = SparkEnv.get.mapOutputTracker.getPushBasedShuffleMapSizesByExecutorId(
+          handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
+        (res.iter, res.enableBatchFetch)
+      } else {
+        val address = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
+          handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
+        (address, canUseBatchFetch(startPartition, endPartition, context))
+      }
     new BlockStoreShuffleReader(
       handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
-      shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context))
+      shouldBatchFetch = enableBatchFetch)
   }

Review comment:
       nit: Cleaner would be:
   
   * rename `enableBatchFetch` to `canEnableBatchFetch`.
   * Set to `true` in else block and `res.enableBatchFetch` in `if` block
   * here, do: `shouldBatchFetch = canEnableBatchFetch && canUseBatchFetch(startPartition, endPartition, context)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719669365



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       To elaborate:
   
   * If push based shuffle is enabled for a stage we want to disable block fetch.
     * The check in `BlockStoreShuffleReader` does this.
   * If block fetch is enabled, we want to make sure it always goes through the `else` condition here.
     * block fetch assumes ordering of id's which get violated in the `if` block here.
   
   For the case of `Utils.isPushBasedShuffleEnabled == true` and `BlockStoreShuffleReader.shouldBatchFetch == true`, we have following cases:
   
   * `shuffleStage.shuffleMergeEnabled == false`
     * `mergeStatuses != null` and `mergeStatuses.length == numReducers` though `mergeStatuses` has only nulls.
     * With current PR, `doBatchFetch` == true, but we take the `if` condition in `convertMapStatuses` - which results in incorrect ordering of blocks.
     * With the proposed change above, we will take else block.
       * This is the root cause of the hang.
     * Additional change - we always take `else` path even for push based shuffle in this case.
       * This should be functionally equivalent - since we can now rely on block fetch when push based shuffle does not kick in (stage retry for example) - so better behavior; but more importantly, should not cause issues.
   * `shuffleStage.shuffleMergeEnabled == true`
     * With this PR, block fetch disabled.
     * With push based shuffle having merged blocks, it takes the `if` block - else it takes `else` block (for direct fetch of unmerged blocks).
       * No behavior change : other than disabling block fetch.
       * This prevents duplicate data fetch issue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719669365



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       To elaborate:
   
   * If push based shuffle is enabled for a stage we want to disable block fetch.
     * The check in `BlockStoreShuffleReader` does this.
   * If block fetch is enabled, we want to make sure it always goes through the `else` condition here.
     * block fetch assumes ordering of id's which get violated in the `if` block here.
   
   
   For background, `mergeStatuses != null` and `mergeStatuses.length == numReducers` when `Utils.isPushBasedShuffleEnabled == true` and `mergeStatuses = Array.empty` (or `null` depending on codepath) when it is disabled.
   
   For the case of `Utils.isPushBasedShuffleEnabled == true` and `BlockStoreShuffleReader.shouldBatchFetch == true`, we have following cases:
   
   * `shuffleStage.shuffleMergeEnabled == false`
     * `mergeStatuses` has only nulls.
     * With current PR, `doBatchFetch` == true, but we take the `if` condition in `convertMapStatuses` - which results in incorrect ordering of blocks.
       * This is the root cause of the hang.
     * With the proposed change above, we will take else block.
       *  Push based shuffle is not relevant for task anyway, and now block fetch will work fine.
     * Additional change - we always take `else` path even when `Utils.isPushBasedShuffleEnabled == true`.
       * This should be functionally equivalent.
       * Now we can potentially benefit from block fetch (when push based shuffle does not kick in - stage retry for example) - so better behavior; but more importantly, should not cause issues.
   * `shuffleStage.shuffleMergeEnabled == true`
     * With this PR, block fetch disabled.
     * With push based shuffle having merged blocks, it takes the `if` block - else it takes `else` block (for direct fetch of unmerged blocks).
       * No behavior change : other than disabling block fetch.
       * This prevents duplicate data fetch issue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932664583


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48310/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720675561



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -519,17 +521,19 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
    * but endMapIndex is excluded). If endMapIndex=Int.MaxValue, the actual endMapIndex will be
    * changed to the length of total map outputs.
    *
-   * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
-   *         and the second item is a sequence of (shuffle block id, shuffle block size, map index)
-   *         tuples describing the shuffle blocks that are stored at that block manager.
-   *         Note that zero-sized blocks are excluded in the result.
+   * @return A case class object which includes two attributes. The first attribute is a sequence
+   *         of 2-item tuples, where the first item in the tuple is a BlockManagerId, and the
+   *         second item is a sequence of (shuffle block id, shuffle block size, map index) tuples
+   *         tuples describing the shuffle blocks that are stored at that block manager. Note that
+   *         zero-sized blocks are excluded in the result. The second attribute is a boolean flag,
+   *         indicating whether batch fetch can be enabled.
    */
   def getMapSizesByExecutorId(
       shuffleId: Int,
       startMapIndex: Int,
       endMapIndex: Int,
       startPartition: Int,
-      endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
+      endPartition: Int): MapSizesByExecutorId

Review comment:
       @Ngone51 We would be trying to infer behavior in this case based on the response, instead of being explicit about what should be enabled/disabled (btw, `blocksByAddress` is iterator, so we would need a `.toList` or some such as well).
   
   The implication of that is, for example, without the `mergeStatuses.exists(_.exists(_ != null))` in `convertMapStatuses`, `hasMergedBlock` will be false - and so we would enable batch fetch - and fail.
   I want to avoid tight coupling based on pattern of blocks in response, and instead explicitly let the response indicate whether we should enable/disable feature. 
   This should allow us to reasonably independently evolve both features in future as well (for example, if push based shuffle starts supporting fetching subset of mapper outputs for some conditions)
   
   Thoughts ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-931714490


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/48283/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932897510


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/48321/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-931601860


   **[Test build #143772 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143772/testReport)** for PR 34156 at commit [`bce8c46`](https://github.com/apache/spark/commit/bce8c460f068d596d62f656d83e45bc27592443d).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Victsm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Victsm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719747636



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       @Ngone51 
   Just take a look at the usage of `PartialMapperPartitionSpec` and `CoalescedMapperPartitionSpec` for the local shuffle read optimization.
   When the local shuffle read optimization kicks in, it indeed makes sense to not do push-based shuffle for that special case since it's supposed to be a local shuffle read for changing partitions of the table after AQE-converted bhj.
   In these cases, because the reducer for the local shuffle is reading from a subset of the mappers, push-based shuffle cannot happen in the first place.
   However, the current condition for disabling batch fetch in `BlockStoreShuffleReader` would only check if push-based shuffle is enabled for the given shuffle dependency, and it will still disable batch fetch even if push-based shuffle cannot happen because the reducer is not fetching blocks from all the mappers.
   We are evaluating passing this additional information to `BlockStoreShuffleReader` as well so that we can properly disable batch fetch only for cases of partition coalesce but not for local shuffle read.
   The PR will be updated with this change once validated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719637656



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       I believe this should suffice:
   ```
       if (mergeStatuses.exists(_.nonEmpty) && mergeStatuses.exists(_.exists(_ != null))
         && startMapIndex == 0
         && endMapIndex == mapStatuses.length) {
   ```
   
   We can remove the changes to `BlockStoreShuffleReader`.
   Thoughts ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [SPARK-36892][Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935646336


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48383/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34156: [SPARK-36892][Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935764946


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/143871/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935335868


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48377/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935438470


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/48379/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34156: [SPARK-36892][Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935672654


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/48383/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-931774843


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/143772/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932881227


   **[Test build #143809 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143809/testReport)** for PR 34156 at commit [`b17fe5a`](https://github.com/apache/spark/commit/b17fe5ab0e13d1f6d29ae99d31970904d98c55a2).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719669365



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       To elaborate:
   
   * If push based shuffle is enabled for a stage we want to disable block fetch.
     * The check in `BlockStoreShuffleReader` does this.
   * If block fetch is enabled, we want to make sure it always goes through the `else` condition here.
     * block fetch assumes ordering of id's which get violated in the `if` block here.
   
   For the case of `Utils.isPushBasedShuffleEnabled == true` and `BlockStoreShuffleReader.shouldBatchFetch == true`, we have following cases:
   
   * `shuffleStage.shuffleMergeEnabled == false`
     * `mergeStatuses != null` and `mergeStatuses.length == numReducers` though `mergeStatuses` has only nulls.
     * With current PR, `doBatchFetch` == true, but we take the `if` condition in `convertMapStatuses` - which results in incorrect ordering of blocks.
       * This is the root cause of the hang.
     * With the proposed change above, we will take else block.
       *  Push based shuffle is not relevant for task anyway, and now block fetch will work fine.
     * Additional change - we always take `else` path even when `Utils.isPushBasedShuffleEnabled == true`.
       * This should be functionally equivalent.
       * Now we can potentially benefit from block fetch (when push based shuffle does not kick in - stage retry for example) - so better behavior; but more importantly, should not cause issues.
   * `shuffleStage.shuffleMergeEnabled == true`
     * With this PR, block fetch disabled.
     * With push based shuffle having merged blocks, it takes the `if` block - else it takes `else` block (for direct fetch of unmerged blocks).
       * No behavior change : other than disabling block fetch.
       * This prevents duplicate data fetch issue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhouyejoe commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
zhouyejoe commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720641295



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -519,17 +521,19 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
    * but endMapIndex is excluded). If endMapIndex=Int.MaxValue, the actual endMapIndex will be
    * changed to the length of total map outputs.
    *
-   * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
-   *         and the second item is a sequence of (shuffle block id, shuffle block size, map index)
-   *         tuples describing the shuffle blocks that are stored at that block manager.
-   *         Note that zero-sized blocks are excluded in the result.
+   * @return A case class object which includes two attributes. The first attribute is a sequence
+   *         of 2-item tuples, where the first item in the tuple is a BlockManagerId, and the
+   *         second item is a sequence of (shuffle block id, shuffle block size, map index) tuples
+   *         tuples describing the shuffle blocks that are stored at that block manager. Note that
+   *         zero-sized blocks are excluded in the result. The second attribute is a boolean flag,
+   *         indicating whether batch fetch can be enabled.
    */
   def getMapSizesByExecutorId(
       shuffleId: Int,
       startMapIndex: Int,
       endMapIndex: Int,
       startPartition: Int,
-      endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
+      endPartition: Int): MapSizesByExecutorId

Review comment:
       Updated PR. UT to be added.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932679441


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/143798/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932702838


   **[Test build #143799 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143799/testReport)** for PR 34156 at commit [`9fe6d85`](https://github.com/apache/spark/commit/9fe6d8561c7cbf95837ba873718be83dc5eb083e).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-931714490


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/48283/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-931160777


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932702838


   **[Test build #143799 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143799/testReport)** for PR 34156 at commit [`9fe6d85`](https://github.com/apache/spark/commit/9fe6d8561c7cbf95837ba873718be83dc5eb083e).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34156: [SPARK-36892][Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935672654


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/48383/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935285735


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48377/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720675561



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -519,17 +521,19 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
    * but endMapIndex is excluded). If endMapIndex=Int.MaxValue, the actual endMapIndex will be
    * changed to the length of total map outputs.
    *
-   * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
-   *         and the second item is a sequence of (shuffle block id, shuffle block size, map index)
-   *         tuples describing the shuffle blocks that are stored at that block manager.
-   *         Note that zero-sized blocks are excluded in the result.
+   * @return A case class object which includes two attributes. The first attribute is a sequence
+   *         of 2-item tuples, where the first item in the tuple is a BlockManagerId, and the
+   *         second item is a sequence of (shuffle block id, shuffle block size, map index) tuples
+   *         tuples describing the shuffle blocks that are stored at that block manager. Note that
+   *         zero-sized blocks are excluded in the result. The second attribute is a boolean flag,
+   *         indicating whether batch fetch can be enabled.
    */
   def getMapSizesByExecutorId(
       shuffleId: Int,
       startMapIndex: Int,
       endMapIndex: Int,
       startPartition: Int,
-      endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
+      endPartition: Int): MapSizesByExecutorId

Review comment:
       @Ngone51 We would be trying to infer behavior in this case based on the response, instead of being explicit about what should be enabled/disabled (btw, `blocksByAddress` is iterator, so we would need a `.toList` or some such as well).
   
   The implication of that is, for example, without the `mergeStatuses.exists(_.exists(_ != null))` in `convertMapStatuses`, `hasMergedBlock` will be false - and so we would enable batch fetch - and fail.
   I want to avoid tight coupling based on pattern of blocks in response, and instead explicitly let the response indicate whether we should enable/disable feature. 
   This should allow us to reasonably independently evolve both features in future as well.
   
   Essentially, I want to get the interfaces right - the actual impl, as long as it is correct for now, can be evolved in future as well.
   
   Thoughts ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-931601860


   **[Test build #143772 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143772/testReport)** for PR 34156 at commit [`bce8c46`](https://github.com/apache/spark/commit/bce8c460f068d596d62f656d83e45bc27592443d).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932881227


   **[Test build #143809 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143809/testReport)** for PR 34156 at commit [`b17fe5a`](https://github.com/apache/spark/commit/b17fe5ab0e13d1f6d29ae99d31970904d98c55a2).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-931122812


   **[Test build #143765 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143765/testReport)** for PR 34156 at commit [`579fe81`](https://github.com/apache/spark/commit/579fe81d6640d4e090d8a89b36525c3ec5365899).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-931253780


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/143765/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932728892


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/143799/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720677348



##########
File path: core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
##########
@@ -129,11 +129,20 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
       endPartition: Int,
       context: TaskContext,
       metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
-    val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
-      handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
+    val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]]
+    val (blocksByAddress, enableBatchFetch) =
+      if (baseShuffleHandle.dependency.shuffleMergeEnabled) {
+        val res = SparkEnv.get.mapOutputTracker.getPushBasedShuffleMapSizesByExecutorId(
+          handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
+        (res.iter, res.enableBatchFetch)

Review comment:
       `res.enableBatchFetch && canUseBatchFetch(startPartition, endPartition, context)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935305619


   **[Test build #143866 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143866/testReport)** for PR 34156 at commit [`994a0a1`](https://github.com/apache/spark/commit/994a0a107f493c69f9abbd68a3f2329455cbe99e).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935444522


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/143866/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935371232


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/48377/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935371232






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719968691



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       > No behavior change : other than disabling block fetch.
   
   @mridulm But this is the major concern from me now by disabling batch fetch for all the cases when PBS is enabled. Because the case like `PartialMapperPartitionSpec` can no longer use the batch fetch when PBS is enabled (which could have perf issue), while it could use in other ways:
    1) `endPartition - startPartition == 1`
    2) only disable batch fetch for the case of partition coalesce rather than all the cases
     And sound like @Victsm wants to take the way 2) as he said:
   
   > We are evaluating passing this additional information to BlockStoreShuffleReader as well so that we can properly disable batch fetch only for cases of partition coalesce but not for local shuffle read.
   
   For 2), just in case you may touch`ShuffleManager`: there're users who use `ShuffleManager` as API to implement their own manager, though it's private API. But I still don't want to see changes in this interface to cause compatibility issues in 3.2 at this monent.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm edited a comment on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
mridulm edited a comment on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935390374


   The test failure is relevant @zhouyejoe.
   Can you add `conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")` to all tests which set `conf.set(PUSH_BASED_SHUFFLE_ENABLED, true)` in this Suite ?
   Looks like something which was probably missed out from Minchu's patch.
   
   The root cause here is that we are modifying the global `conf` - which should not have been done in the original PR, and was probably missed in the reviews. We have to fix it in a follow up jira.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932068453


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [SPARK-36892][Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935725149


   **[Test build #143871 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143871/testReport)** for PR 34156 at commit [`9d34e03`](https://github.com/apache/spark/commit/9d34e035828ccbb09061ee5f12a913eef1f07f10).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719637656



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       I believe this should suffice:
   ```
       if (mergeStatuses.exists(_.nonEmpty) && mergeStatuses.exists(_.exists(_ != null))
         && startMapIndex == 0
         && endMapIndex == mapStatuses.length) {
   ```
   
   and we can keep the changes to `BlockStoreShuffleReader`.
   Thoughts ?
   
   (mergeStatuses.exists(_.nonEmpty) cheaply remove non-push based shuffle cases)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Victsm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Victsm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719496790



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       Push-based shuffle in practice shouldn't interfere with partition coalesce.
   A reducer should be able to fetch from multiple shuffle partitions as separate merged shuffle partitions, and that's the current behavior implemented in this code.
   The only thing that prevents it from working properly is that `ShuffleBlockFetcherIterator` later on tries to merge continuous blocks generated in this case for batch fetch, which is totally unnecessary.
   In 3.2.0, AQE, partition coalesce, and batch fetch are all default to true.
   My concern with only keeping this condition (`endPartition - startPartition == 1`) instead of disable batch fetch is that it will effectively disable push-based shuffle in 3.2.0 even if the push.enabled flag is set to true.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932897510


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/48321/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720687901



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1447,8 +1451,9 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: SPARK-35036: Instead of reading map blocks in case of AQE with Push based shuffle,
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
-    if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+    if (mergeStatuses.exists(_.nonEmpty) && mergeStatuses.exists(_.exists(_ != null))

Review comment:
       Agree, `mergeStatuses.exists(_.exists(_ != null)` should be enough now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r721552158



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1447,8 +1451,9 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: SPARK-35036: Instead of reading map blocks in case of AQE with Push based shuffle,
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
-    if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+    if (mergeStatuses.exists(_.nonEmpty) && mergeStatuses.exists(_.exists(_ != null))

Review comment:
       @zhouyejoe Do we still need this check `mergeStatuses.exists(_.nonEmpty)`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhouyejoe commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
zhouyejoe commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720616672



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -519,17 +521,19 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
    * but endMapIndex is excluded). If endMapIndex=Int.MaxValue, the actual endMapIndex will be
    * changed to the length of total map outputs.
    *
-   * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
-   *         and the second item is a sequence of (shuffle block id, shuffle block size, map index)
-   *         tuples describing the shuffle blocks that are stored at that block manager.
-   *         Note that zero-sized blocks are excluded in the result.
+   * @return A case class object which includes two attributes. The first attribute is a sequence
+   *         of 2-item tuples, where the first item in the tuple is a BlockManagerId, and the
+   *         second item is a sequence of (shuffle block id, shuffle block size, map index) tuples
+   *         tuples describing the shuffle blocks that are stored at that block manager. Note that
+   *         zero-sized blocks are excluded in the result. The second attribute is a boolean flag,
+   *         indicating whether batch fetch can be enabled.
    */
   def getMapSizesByExecutorId(
       shuffleId: Int,
       startMapIndex: Int,
       endMapIndex: Int,
       startPartition: Int,
-      endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
+      endPartition: Int): MapSizesByExecutorId

Review comment:
       Ok. Will try it out tonight!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm edited a comment on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
mridulm edited a comment on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935390374


   The test failure is relevant @zhouyejoe.
   Can you add `conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")` to all tests which set `conf.set(PUSH_BASED_SHUFFLE_ENABLED, true)` in this Suite ? (Or set it to `conf` after it is created).
   Looks like something which was probably missed out from Minchu's patch.
   
   The root cause here is that we are modifying the global `conf` - which should not have been done in the original PR, and was probably missed in the reviews. We have to fix it in a follow up jira.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719556271



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       > Push based shuffle can work with multiple partitions per reducer.
   
   Yeah, I get it after second thinking. But given that, currently, there's actually no case that has multiple partitions, would  
   `endPartition - startPartition == 1` be a simpler fix to go?
   
   > Why not keep the behavior of push-based shuffle and disable the behavior of batch fetch when partition coalesce happens?
   
   For this way, I'm afraid we have to change the interface `ShuffleManager.getReader`, which could involve more changes.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935352266


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48379/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhouyejoe commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
zhouyejoe commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720641295



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -519,17 +521,19 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
    * but endMapIndex is excluded). If endMapIndex=Int.MaxValue, the actual endMapIndex will be
    * changed to the length of total map outputs.
    *
-   * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
-   *         and the second item is a sequence of (shuffle block id, shuffle block size, map index)
-   *         tuples describing the shuffle blocks that are stored at that block manager.
-   *         Note that zero-sized blocks are excluded in the result.
+   * @return A case class object which includes two attributes. The first attribute is a sequence
+   *         of 2-item tuples, where the first item in the tuple is a BlockManagerId, and the
+   *         second item is a sequence of (shuffle block id, shuffle block size, map index) tuples
+   *         tuples describing the shuffle blocks that are stored at that block manager. Note that
+   *         zero-sized blocks are excluded in the result. The second attribute is a boolean flag,
+   *         indicating whether batch fetch can be enabled.
    */
   def getMapSizesByExecutorId(
       shuffleId: Int,
       startMapIndex: Int,
       endMapIndex: Int,
       startPartition: Int,
-      endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
+      endPartition: Int): MapSizesByExecutorId

Review comment:
       Didn't get enough time to work it tonight. Will update the PR tomorrow ASAP.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Victsm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
Victsm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719496790



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
     if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
-      && endMapIndex == mapStatuses.length) {
+      && endMapIndex == mapStatuses.length && endPartition - startPartition == 1) {

Review comment:
       Push-based shuffle in practice shouldn't interfere with partition coalesce.
   A reducer should be able to fetch from multiple shuffle partitions as separate merged shuffle partitions.
   In 3.2.0, AQE, partition coalesce, and batch fetch are all default to true.
   My concern with this condition is that it will effectively disable push-based shuffle in 3.2.0 even if the push.enabled flag is set to true.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-931252643


   **[Test build #143765 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143765/testReport)** for PR 34156 at commit [`579fe81`](https://github.com/apache/spark/commit/579fe81d6640d4e090d8a89b36525c3ec5365899).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-931122812


   **[Test build #143765 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143765/testReport)** for PR 34156 at commit [`579fe81`](https://github.com/apache/spark/commit/579fe81d6640d4e090d8a89b36525c3ec5365899).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [SPARK-36892][Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935565750


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48383/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-931700200


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48283/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720675561



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -519,17 +521,19 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
    * but endMapIndex is excluded). If endMapIndex=Int.MaxValue, the actual endMapIndex will be
    * changed to the length of total map outputs.
    *
-   * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
-   *         and the second item is a sequence of (shuffle block id, shuffle block size, map index)
-   *         tuples describing the shuffle blocks that are stored at that block manager.
-   *         Note that zero-sized blocks are excluded in the result.
+   * @return A case class object which includes two attributes. The first attribute is a sequence
+   *         of 2-item tuples, where the first item in the tuple is a BlockManagerId, and the
+   *         second item is a sequence of (shuffle block id, shuffle block size, map index) tuples
+   *         tuples describing the shuffle blocks that are stored at that block manager. Note that
+   *         zero-sized blocks are excluded in the result. The second attribute is a boolean flag,
+   *         indicating whether batch fetch can be enabled.
    */
   def getMapSizesByExecutorId(
       shuffleId: Int,
       startMapIndex: Int,
       endMapIndex: Int,
       startPartition: Int,
-      endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
+      endPartition: Int): MapSizesByExecutorId

Review comment:
       @Ngone51 We would be trying to infer behavior in this case based on the response, instead of being explicit about what should be enabled/disabled (btw, `blocksByAddress` is iterator, so we would need a `.toList` or some such as well).
   
   The implication of that is, for example, without the `mergeStatuses.exists(_.exists(_ != null))` in `convertMapStatuses`, `hasMergedBlock` will be false - and so we would enable batch fetch - and fail.
   I want to avoid tight coupling based on pattern of blocks in response, and instead explicitly let the response indicate whether we should enable/disable feature. 
   
   Thoughts ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720680424



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -529,7 +532,32 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
       startMapIndex: Int,
       endMapIndex: Int,
       startPartition: Int,
-      endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
+      endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
+    getPushBasedShuffleMapSizesByExecutorId(
+      shuffleId, startMapIndex, endMapIndex, startPartition, endPartition).iter

Review comment:
       Maybe assert that `enableBatchFetch == true` ? We should not have the other case here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34156: [SPARK-36892][Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-935764946


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/143871/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34156: [WIP] [SPARK-36892] [Core] Disable batch fetch for a shuffle when push based shuffle is enabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34156:
URL: https://github.com/apache/spark/pull/34156#issuecomment-932721857


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/48311/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org