You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2021/07/17 05:27:10 UTC

[spark] branch master updated: [SPARK-32922][SHUFFLE][CORE][FOLLOWUP] Fixes few issues when the executor tries to fetch push-merged blocks

This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d2cbad  [SPARK-32922][SHUFFLE][CORE][FOLLOWUP] Fixes few issues when the executor tries to fetch push-merged blocks
6d2cbad is described below

commit 6d2cbadcfe3b4badad9400c3bedf697ea6196ffa
Author: Chandni Singh <si...@gmail.com>
AuthorDate: Sat Jul 17 00:26:46 2021 -0500

    [SPARK-32922][SHUFFLE][CORE][FOLLOWUP] Fixes few issues when the executor tries to fetch push-merged blocks
    
    ### What changes were proposed in this pull request?
    Below 2 bugs were introduced with https://github.com/apache/spark/pull/32140
    1. Instead of requesting the local-dirs for push-merged-local blocks from the ESS, `PushBasedFetchHelper` requests it from other executors. Push-based shuffle is only enabled when the ESS is enabled so it should always fetch the dirs from the ESS and not from other executors which is not yet supported.
    2. The size of the push-merged blocks is logged incorrectly.
    
    ### Why are the changes needed?
    This fixes the above mentioned bugs and is needed for push-based shuffle to work properly.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Tested this by running an application on the cluster. The UTs mock the call `hostLocalDirManager.getHostLocalDirs` which is why didn't catch (1) with the UT. However, the fix is trivial and checking this in the UT will require a lot more effort so I haven't modified it in the UT.
    Logs of the executor with the bug
    ```
    21/07/15 15:42:46 WARN ExternalBlockStoreClient: Error while trying to get the host local dirs for [shuffle-push-merger]
    21/07/15 15:42:46 WARN PushBasedFetchHelper: Error while fetching the merged dirs for push-merged-local blocks: shuffle_0_-1_13. Fetch the original blocks instead
    java.lang.RuntimeException: java.lang.IllegalStateException: Invalid executor id: shuffle-push-merger, expected 92.
    	at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:130)
    	at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163)
    ```
    After the fix, the executors were able to fetch the local push-merged blocks.
    
    Closes #33378 from otterc/SPARK-32922-followup.
    
    Authored-by: Chandni Singh <si...@gmail.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../org/apache/spark/storage/PushBasedFetchHelper.scala    | 14 +++++++++-----
 .../apache/spark/storage/ShuffleBlockFetcherIterator.scala |  6 +++---
 2 files changed, 12 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala b/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala
index 63f42a0..096ea24 100644
--- a/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala
+++ b/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala
@@ -36,8 +36,8 @@ import org.apache.spark.storage.ShuffleBlockFetcherIterator._
  * Helper class for [[ShuffleBlockFetcherIterator]] that encapsulates all the push-based
  * functionality to fetch push-merged block meta and shuffle chunks.
  * A push-merged block contains multiple shuffle chunks where each shuffle chunk contains multiple
- * shuffle blocks that belong to the common reduce partition and were merged by the ESS to that
- * chunk.
+ * shuffle blocks that belong to the common reduce partition and were merged by the
+ * external shuffle service to that chunk.
  */
 private class PushBasedFetchHelper(
    private val iterator: ShuffleBlockFetcherIterator,
@@ -197,9 +197,13 @@ private class PushBasedFetchHelper(
           localShuffleMergerBlockMgrId)
       }
     } else {
-      logDebug(s"Asynchronous fetch the push-merged-local blocks without cached merged dirs")
-      hostLocalDirManager.getHostLocalDirs(localShuffleMergerBlockMgrId.host,
-        localShuffleMergerBlockMgrId.port, Array(SHUFFLE_MERGER_IDENTIFIER)) {
+      // Push-based shuffle is only enabled when the external shuffle service is enabled. If the
+      // external shuffle service is not enabled, then there will not be any push-merged blocks
+      // for the iterator to fetch.
+      logDebug(s"Asynchronous fetch the push-merged-local blocks without cached merged " +
+        s"dirs from the external shuffle service")
+      hostLocalDirManager.getHostLocalDirs(blockManager.blockManagerId.host,
+        blockManager.externalShuffleServicePort, Array(SHUFFLE_MERGER_IDENTIFIER)) {
         case Success(dirs) =>
           logDebug(s"Fetched merged dirs in " +
             s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms")
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 094c3b5..d03f20a 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -386,7 +386,7 @@ final class ShuffleBlockFetcherIterator(
         if (address.host == blockManager.blockManagerId.host) {
           numBlocksToFetch += blockInfos.size
           pushMergedLocalBlocks ++= blockInfos.map(_._1)
-          pushMergedLocalBlockBytes += blockInfos.map(_._3).sum
+          pushMergedLocalBlockBytes += blockInfos.map(_._2).sum
         } else {
           collectFetchRequests(address, blockInfos, collectedRemoteRequests)
         }
@@ -886,8 +886,8 @@ final class ShuffleBlockFetcherIterator(
           //    blockId is a ShuffleBlockChunkId.
           // 2. Failure to read the push-merged-local meta. In this case, the blockId is
           //    ShuffleBlockId.
-          // 3. Failure to get the push-merged-local directories from the ESS. In this case, the
-          //    blockId is ShuffleBlockId.
+          // 3. Failure to get the push-merged-local directories from the external shuffle service.
+          //    In this case, the blockId is ShuffleBlockId.
           if (pushBasedFetchHelper.isRemotePushMergedBlockAddress(address)) {
             numBlocksInFlightPerAddress(address) = numBlocksInFlightPerAddress(address) - 1
             bytesInFlight -= size

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org