You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/02/25 13:31:58 UTC

[GitHub] [spark] cloud-fan commented on a change in pull request #22173: [SPARK-24355] Spark external shuffle server improvement to better handle block fetch requests.

cloud-fan commented on a change in pull request #22173: [SPARK-24355] Spark external shuffle server improvement to better handle block fetch requests.
URL: https://github.com/apache/spark/pull/22173#discussion_r383878894
 
 

 ##########
 File path: common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
 ##########
 @@ -144,14 +186,23 @@ public TransportChannelHandler initializePipeline(
       RpcHandler channelRpcHandler) {
     try {
       TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
-      channel.pipeline()
+      ChunkFetchRequestHandler chunkFetchHandler =
+        createChunkFetchHandler(channelHandler, channelRpcHandler);
+      ChannelPipeline pipeline = channel.pipeline()
         .addLast("encoder", ENCODER)
         .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
         .addLast("decoder", DECODER)
-        .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
+        .addLast("idleStateHandler",
+          new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
         // NOTE: Chunks are currently guaranteed to be returned in the order of request, but this
         // would require more logic to guarantee if this were not part of the same event loop.
         .addLast("handler", channelHandler);
+      // Use a separate EventLoopGroup to handle ChunkFetchRequest messages for shuffle rpcs.
+      if (conf.getModuleName() != null &&
+          conf.getModuleName().equalsIgnoreCase("shuffle")
+          && !isClientOnly) {
+        pipeline.addLast(chunkFetchWorkers, "chunkFetchHandler", chunkFetchHandler);
 
 Review comment:
   Why does `chunkFetchWorkers` need to be global? Can't we create one here every time?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org