You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/07/09 14:54:10 UTC

[GitHub] [spark] tgravescs commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

tgravescs commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r452276119



##########
File path: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -478,30 +475,43 @@ final class ShuffleBlockFetcherIterator(
       logDebug(s"Asynchronous fetching host-local blocks without cached executors' dir: " +
         s"${immutableHostLocalBlocksWithoutDirs.mkString(", ")}")
       val execIdsWithoutDirs = immutableHostLocalBlocksWithoutDirs.keys.map(_.executorId).toArray
-      hostLocalDirManager.getHostLocalDirs(execIdsWithoutDirs) {
-        case Success(dirs) =>
-          immutableHostLocalBlocksWithoutDirs.foreach { case (hostLocalBmId, blockInfos) =>
-            blockInfos.takeWhile { case (blockId, _, mapIndex) =>
-              fetchHostLocalBlock(
-                blockId,
-                mapIndex,
-                dirs.get(hostLocalBmId.executorId),
-                hostLocalBmId)
+
+      val dirFetchRequests = if (blockManager.externalShuffleServiceEnabled) {
+        val host = blockManager.blockManagerId.host
+        val port = blockManager.externalShuffleServicePort
+        Seq((host, port, immutableHostLocalBlocksWithoutDirs.keys.toArray))
+      } else {
+        hostLocalBlocksByExecutor.keysIterator

Review comment:
       isn't this just essentially calculating immutableHostLocalBlocksWithoutDirs again? That contains the bmids of the ones missing from the cache.

##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
##########
@@ -61,4 +63,17 @@ public MetricSet shuffleMetrics() {
     // Return an empty MetricSet by default.
     return () -> Collections.emptyMap();
   }
+
+  /**
+   * Request the local disk directories, which are specified by DiskBlockManager, for the executors
+   * from the external shuffle service (when this is a ExternalBlockStoreClient) or BlockManager
+   * (when this is a NettyBlockTransferService). Note there's only one executor when this is a
+   * NettyBlockTransferService.

Review comment:
       might be nice to extend saying because we ask one specific executor at a time.

##########
File path: core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
##########
@@ -194,6 +196,45 @@ private[spark] class NettyBlockTransferService(
     result.future
   }
 
+  override def getHostLocalDirs(
+      host: String,
+      port: Int,
+      execIds: Array[String],
+      hostLocalDirsCompletable: CompletableFuture[util.Map[String, Array[String]]]): Unit = {
+    val getLocalDirsMessage = new GetLocalDirsForExecutors(appId, execIds)
+    try {
+      val client = clientFactory.createClient(host, port)
+      client.sendRpc(getLocalDirsMessage.toByteBuffer, new RpcResponseCallback() {
+        override def onSuccess(response: ByteBuffer): Unit = {
+          try {
+            val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response)
+            hostLocalDirsCompletable.complete(
+              msgObj.asInstanceOf[LocalDirsForExecutors].getLocalDirsByExec)
+          } catch {
+            case t: Throwable =>
+              logWarning(s"Error trying to get the host local dirs for executor ${execIds.head}",
+                t.getCause)
+              hostLocalDirsCompletable.completeExceptionally(t)
+          } finally {
+            client.close()
+          }
+        }
+
+        override def onFailure(t: Throwable): Unit = {
+          logWarning(s"Error trying to get the host local dirs for executor ${execIds.head}",
+            t.getCause)
+          hostLocalDirsCompletable.completeExceptionally(t)
+          client.close()
+        }
+      })
+    } catch {
+      case e: IOException =>

Review comment:
       is there a reason we only catch these 2?  You think any others are unknown, I'm assuming that will cause the executor to exit?

##########
File path: core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
##########
@@ -113,6 +113,14 @@ class NettyBlockRpcServer(
             s"when there is not sufficient space available to store the block.")
           responseContext.onFailure(exception)
         }
+
+      case req: GetLocalDirsForExecutors =>

Review comment:
       might be nice to rename req getLocalDirs to keep convention like other cases

##########
File path: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -478,30 +475,43 @@ final class ShuffleBlockFetcherIterator(
       logDebug(s"Asynchronous fetching host-local blocks without cached executors' dir: " +
         s"${immutableHostLocalBlocksWithoutDirs.mkString(", ")}")
       val execIdsWithoutDirs = immutableHostLocalBlocksWithoutDirs.keys.map(_.executorId).toArray
-      hostLocalDirManager.getHostLocalDirs(execIdsWithoutDirs) {
-        case Success(dirs) =>
-          immutableHostLocalBlocksWithoutDirs.foreach { case (hostLocalBmId, blockInfos) =>
-            blockInfos.takeWhile { case (blockId, _, mapIndex) =>
-              fetchHostLocalBlock(
-                blockId,
-                mapIndex,
-                dirs.get(hostLocalBmId.executorId),
-                hostLocalBmId)
+
+      val dirFetchRequests = if (blockManager.externalShuffleServiceEnabled) {
+        val host = blockManager.blockManagerId.host
+        val port = blockManager.externalShuffleServicePort
+        Seq((host, port, immutableHostLocalBlocksWithoutDirs.keys.toArray))
+      } else {
+        hostLocalBlocksByExecutor.keysIterator
+          .filter(exec => execIdsWithoutDirs.contains(exec.executorId))

Review comment:
       exec here is really mId, perhaps rename to be clear.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org