You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "mridulm (via GitHub)" <gi...@apache.org> on 2023/07/31 05:52:19 UTC

[GitHub] [spark] mridulm commented on a diff in pull request #41489: [SPARK-43987][Shuffle] Separate finalizeShuffleMerge Processing to Dedicated Thread Pools

mridulm commented on code in PR #41489:
URL: https://github.com/apache/spark/pull/41489#discussion_r1278808341


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleTransportContext.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.MessageToMessageDecoder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.TransportContext;
+import org.apache.spark.network.protocol.Message;
+import org.apache.spark.network.protocol.MessageDecoder;
+import org.apache.spark.network.protocol.RpcRequest;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.server.TransportChannelHandler;
+import org.apache.spark.network.server.TransportRequestHandler;
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
+import org.apache.spark.network.util.IOMode;
+import org.apache.spark.network.util.NettyUtils;
+import org.apache.spark.network.util.TransportConf;
+
+public class ShuffleTransportContext extends TransportContext {
+  private static final Logger logger = LoggerFactory.getLogger(ShuffleTransportContext.class);
+  private final EventLoopGroup finalizeWorkers;
+
+  public ShuffleTransportContext(
+    TransportConf conf,
+    ExternalBlockHandler rpcHandler,
+    boolean closeIdleConnections) {
+    this(conf, rpcHandler, closeIdleConnections, false);
+  }
+
+  public ShuffleTransportContext(TransportConf conf,
+      RpcHandler rpcHandler,
+      boolean closeIdleConnections,
+      boolean isClientOnly) {
+    super(conf, rpcHandler, closeIdleConnections, isClientOnly);
+
+    if (conf.getModuleName() != null &&
+      conf.getModuleName().equalsIgnoreCase("shuffle") && conf.separateFinalizeShuffleMerge()) {

Review Comment:
   super nit:
   ```suggestion
       if ("shuffle".equalsIgnoreCase(conf.getModuleName()) && conf.separateFinalizeShuffleMerge()) {
   ```



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleTransportContext.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.MessageToMessageDecoder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.TransportContext;
+import org.apache.spark.network.protocol.Message;
+import org.apache.spark.network.protocol.MessageDecoder;
+import org.apache.spark.network.protocol.RpcRequest;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.server.TransportChannelHandler;
+import org.apache.spark.network.server.TransportRequestHandler;
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
+import org.apache.spark.network.util.IOMode;
+import org.apache.spark.network.util.NettyUtils;
+import org.apache.spark.network.util.TransportConf;
+
+public class ShuffleTransportContext extends TransportContext {
+  private static final Logger logger = LoggerFactory.getLogger(ShuffleTransportContext.class);
+  private final EventLoopGroup finalizeWorkers;
+
+  public ShuffleTransportContext(
+    TransportConf conf,
+    ExternalBlockHandler rpcHandler,
+    boolean closeIdleConnections) {
+    this(conf, rpcHandler, closeIdleConnections, false);
+  }
+
+  public ShuffleTransportContext(TransportConf conf,
+      RpcHandler rpcHandler,
+      boolean closeIdleConnections,
+      boolean isClientOnly) {
+    super(conf, rpcHandler, closeIdleConnections, isClientOnly);
+
+    if (conf.getModuleName() != null &&
+      conf.getModuleName().equalsIgnoreCase("shuffle") && conf.separateFinalizeShuffleMerge()) {
+      finalizeWorkers = NettyUtils.createEventLoop(
+        IOMode.valueOf(conf.ioMode()),
+        conf.finalizeShuffleMergeHandlerThreads(),
+        "shuffle-finalize-merge-handler");
+      logger.info("finalize shuffle merged workers created");
+    } else {
+      finalizeWorkers = null;
+    }
+  }
+
+  @Override
+  public TransportChannelHandler initializePipeline(SocketChannel channel) {
+    TransportChannelHandler ch = super.initializePipeline(channel);
+    addFinalizeHandlerToPipelineIfNeeded(channel, ch);
+    return ch;
+  }
+
+  @Override
+  public TransportChannelHandler initializePipeline(SocketChannel channel,
+      RpcHandler channelRpcHandler) {
+    TransportChannelHandler ch = super.initializePipeline(channel, channelRpcHandler);
+    addFinalizeHandlerToPipelineIfNeeded(channel, ch);
+    return ch;
+  }
+
+  /**
+   * Add finalize handler to pipeline if needed. This is needed only when separateFinalizeShuffleMerge
+   * is enabled.
+   */
+  private void addFinalizeHandlerToPipelineIfNeeded(SocketChannel channel,
+      TransportChannelHandler ch) {
+    if (finalizeWorkers != null) {
+      channel.pipeline().addLast(finalizeWorkers, FinalizedHandler.HANDLER_NAME,
+        new FinalizedHandler(ch.getRequestHandler()));
+    }
+  }
+
+  @Override
+  protected MessageToMessageDecoder<ByteBuf> getDecoder() {
+    if (finalizeWorkers != null) {
+      return new ShuffleMessageDecoder(MessageDecoder.INSTANCE);

Review Comment:
   This is invoked for each pipeline. Make this a static final variable in this class instead of creating each time.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleTransportContext.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.MessageToMessageDecoder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.TransportContext;
+import org.apache.spark.network.protocol.Message;
+import org.apache.spark.network.protocol.MessageDecoder;
+import org.apache.spark.network.protocol.RpcRequest;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.server.TransportChannelHandler;
+import org.apache.spark.network.server.TransportRequestHandler;
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
+import org.apache.spark.network.util.IOMode;
+import org.apache.spark.network.util.NettyUtils;
+import org.apache.spark.network.util.TransportConf;
+
+public class ShuffleTransportContext extends TransportContext {
+  private static final Logger logger = LoggerFactory.getLogger(ShuffleTransportContext.class);
+  private final EventLoopGroup finalizeWorkers;
+
+  public ShuffleTransportContext(
+    TransportConf conf,
+    ExternalBlockHandler rpcHandler,
+    boolean closeIdleConnections) {
+    this(conf, rpcHandler, closeIdleConnections, false);
+  }
+
+  public ShuffleTransportContext(TransportConf conf,
+      RpcHandler rpcHandler,
+      boolean closeIdleConnections,
+      boolean isClientOnly) {
+    super(conf, rpcHandler, closeIdleConnections, isClientOnly);
+
+    if (conf.getModuleName() != null &&
+      conf.getModuleName().equalsIgnoreCase("shuffle") && conf.separateFinalizeShuffleMerge()) {
+      finalizeWorkers = NettyUtils.createEventLoop(
+        IOMode.valueOf(conf.ioMode()),
+        conf.finalizeShuffleMergeHandlerThreads(),
+        "shuffle-finalize-merge-handler");
+      logger.info("finalize shuffle merged workers created");
+    } else {
+      finalizeWorkers = null;
+    }
+  }
+
+  @Override
+  public TransportChannelHandler initializePipeline(SocketChannel channel) {
+    TransportChannelHandler ch = super.initializePipeline(channel);
+    addFinalizeHandlerToPipelineIfNeeded(channel, ch);
+    return ch;
+  }
+
+  @Override
+  public TransportChannelHandler initializePipeline(SocketChannel channel,
+      RpcHandler channelRpcHandler) {
+    TransportChannelHandler ch = super.initializePipeline(channel, channelRpcHandler);
+    addFinalizeHandlerToPipelineIfNeeded(channel, ch);
+    return ch;
+  }
+
+  /**
+   * Add finalize handler to pipeline if needed. This is needed only when separateFinalizeShuffleMerge
+   * is enabled.
+   */
+  private void addFinalizeHandlerToPipelineIfNeeded(SocketChannel channel,
+      TransportChannelHandler ch) {
+    if (finalizeWorkers != null) {
+      channel.pipeline().addLast(finalizeWorkers, FinalizedHandler.HANDLER_NAME,
+        new FinalizedHandler(ch.getRequestHandler()));
+    }
+  }
+
+  @Override
+  protected MessageToMessageDecoder<ByteBuf> getDecoder() {
+    if (finalizeWorkers != null) {
+      return new ShuffleMessageDecoder(MessageDecoder.INSTANCE);
+    }
+    return super.getDecoder();
+  }
+
+  static class ShuffleMessageDecoder extends MessageToMessageDecoder<ByteBuf> {
+
+    private final MessageDecoder delegate;
+    public ShuffleMessageDecoder(MessageDecoder delegate) {
+      super();
+      this.delegate = delegate;
+    }
+
+    /**
+     * Decode the message and check if it is a finalize merge request. If yes, then create a
+     * internal rpc request message and add it to the list of messages to be handled by
+     * {@link TransportChannelHandler}
+    */
+    @Override
+    protected void decode(ChannelHandlerContext channelHandlerContext,
+        ByteBuf byteBuf,
+        List<Object> list) throws Exception {
+      delegate.decode(channelHandlerContext, byteBuf, list);
+      Object msg = list.get(list.size() - 1);
+      if (msg instanceof RpcRequest) {
+        RpcRequest req = (RpcRequest) msg;
+        ByteBuffer buffer = req.body().nioByteBuffer();
+        byte type = Unpooled.wrappedBuffer(buffer).readByte();
+        if (type == BlockTransferMessage.Type.FINALIZE_SHUFFLE_MERGE.id()) {
+          list.remove(list.size() - 1);
+          RpcRequestInternal rpcRequestInternal =
+            new RpcRequestInternal(BlockTransferMessage.Type.FINALIZE_SHUFFLE_MERGE, req);
+          logger.trace("Created internal rpc request msg with rpcId {} for finalize merge req",
+            req.requestId);
+          list.add(rpcRequestInternal);
+        }
+      }
+    }
+  }
+
+  /**
+   * Internal message to handle rpc requests that should not be accepted by
+   * {@link TransportChannelHandler}. Since, this message doesn't extend {@link Message}, it will
+   * not be accepted by {@link TransportChannelHandler}.

Review Comment:
   ```suggestion
      * Internal message to handle rpc requests that is not accepted by
      * {@link TransportChannelHandler} as this message doesn't extend {@link Message}. It will
      * be accepted by {@link FinalizedHandler} instead, which is configured to execute in a separate EventLoopGroup
   ```



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleTransportContext.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.MessageToMessageDecoder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.TransportContext;
+import org.apache.spark.network.protocol.Message;
+import org.apache.spark.network.protocol.MessageDecoder;
+import org.apache.spark.network.protocol.RpcRequest;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.server.TransportChannelHandler;
+import org.apache.spark.network.server.TransportRequestHandler;
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
+import org.apache.spark.network.util.IOMode;
+import org.apache.spark.network.util.NettyUtils;
+import org.apache.spark.network.util.TransportConf;
+
+public class ShuffleTransportContext extends TransportContext {
+  private static final Logger logger = LoggerFactory.getLogger(ShuffleTransportContext.class);
+  private final EventLoopGroup finalizeWorkers;
+
+  public ShuffleTransportContext(
+    TransportConf conf,
+    ExternalBlockHandler rpcHandler,
+    boolean closeIdleConnections) {
+    this(conf, rpcHandler, closeIdleConnections, false);
+  }
+
+  public ShuffleTransportContext(TransportConf conf,
+      RpcHandler rpcHandler,
+      boolean closeIdleConnections,
+      boolean isClientOnly) {
+    super(conf, rpcHandler, closeIdleConnections, isClientOnly);
+
+    if (conf.getModuleName() != null &&
+      conf.getModuleName().equalsIgnoreCase("shuffle") && conf.separateFinalizeShuffleMerge()) {
+      finalizeWorkers = NettyUtils.createEventLoop(
+        IOMode.valueOf(conf.ioMode()),
+        conf.finalizeShuffleMergeHandlerThreads(),
+        "shuffle-finalize-merge-handler");
+      logger.info("finalize shuffle merged workers created");
+    } else {
+      finalizeWorkers = null;
+    }
+  }
+
+  @Override
+  public TransportChannelHandler initializePipeline(SocketChannel channel) {
+    TransportChannelHandler ch = super.initializePipeline(channel);
+    addFinalizeHandlerToPipelineIfNeeded(channel, ch);
+    return ch;
+  }
+
+  @Override
+  public TransportChannelHandler initializePipeline(SocketChannel channel,
+      RpcHandler channelRpcHandler) {
+    TransportChannelHandler ch = super.initializePipeline(channel, channelRpcHandler);
+    addFinalizeHandlerToPipelineIfNeeded(channel, ch);
+    return ch;
+  }
+
+  /**
+   * Add finalize handler to pipeline if needed. This is needed only when separateFinalizeShuffleMerge
+   * is enabled.
+   */
+  private void addFinalizeHandlerToPipelineIfNeeded(SocketChannel channel,
+      TransportChannelHandler ch) {
+    if (finalizeWorkers != null) {
+      channel.pipeline().addLast(finalizeWorkers, FinalizedHandler.HANDLER_NAME,
+        new FinalizedHandler(ch.getRequestHandler()));
+    }
+  }
+
+  @Override
+  protected MessageToMessageDecoder<ByteBuf> getDecoder() {
+    if (finalizeWorkers != null) {
+      return new ShuffleMessageDecoder(MessageDecoder.INSTANCE);
+    }
+    return super.getDecoder();
+  }
+
+  static class ShuffleMessageDecoder extends MessageToMessageDecoder<ByteBuf> {
+
+    private final MessageDecoder delegate;
+    public ShuffleMessageDecoder(MessageDecoder delegate) {
+      super();
+      this.delegate = delegate;
+    }
+
+    /**
+     * Decode the message and check if it is a finalize merge request. If yes, then create a
+     * internal rpc request message and add it to the list of messages to be handled by
+     * {@link TransportChannelHandler}
+    */
+    @Override
+    protected void decode(ChannelHandlerContext channelHandlerContext,
+        ByteBuf byteBuf,
+        List<Object> list) throws Exception {
+      delegate.decode(channelHandlerContext, byteBuf, list);
+      Object msg = list.get(list.size() - 1);
+      if (msg instanceof RpcRequest) {
+        RpcRequest req = (RpcRequest) msg;
+        ByteBuffer buffer = req.body().nioByteBuffer();
+        byte type = Unpooled.wrappedBuffer(buffer).readByte();
+        if (type == BlockTransferMessage.Type.FINALIZE_SHUFFLE_MERGE.id()) {
+          list.remove(list.size() - 1);
+          RpcRequestInternal rpcRequestInternal =
+            new RpcRequestInternal(BlockTransferMessage.Type.FINALIZE_SHUFFLE_MERGE, req);
+          logger.trace("Created internal rpc request msg with rpcId {} for finalize merge req",
+            req.requestId);
+          list.add(rpcRequestInternal);
+        }
+      }
+    }
+  }
+
+  /**
+   * Internal message to handle rpc requests that should not be accepted by
+   * {@link TransportChannelHandler}. Since, this message doesn't extend {@link Message}, it will
+   * not be accepted by {@link TransportChannelHandler}.
+   */
+  static class RpcRequestInternal {
+    public final BlockTransferMessage.Type messageType;
+    public final RpcRequest rpcRequest;
+
+    public RpcRequestInternal(BlockTransferMessage.Type messageType,
+        RpcRequest rpcRequest) {
+      this.messageType = messageType;
+      this.rpcRequest = rpcRequest;
+    }
+  }
+
+  static class FinalizedHandler extends SimpleChannelInboundHandler<RpcRequestInternal> {
+    private static final Logger logger = LoggerFactory.getLogger(FinalizedHandler.class);
+    public static final String HANDLER_NAME = "finalizeHandler";
+    private final TransportRequestHandler transportRequestHandler;
+
+    @Override
+    public boolean acceptInboundMessage(Object msg) throws Exception {
+      if (msg instanceof RpcRequestInternal) {
+        RpcRequestInternal rpcRequestInternal = (RpcRequestInternal) msg;
+        return rpcRequestInternal.messageType == BlockTransferMessage.Type.FINALIZE_SHUFFLE_MERGE;
+      }
+      return false;
+    }
+
+    public FinalizedHandler(TransportRequestHandler transportRequestHandler) {
+      this.transportRequestHandler = transportRequestHandler;
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext channelHandlerContext,
+        RpcRequestInternal req) throws Exception {
+      logger.debug("finalize handler invoked for rpc request {}", req.rpcRequest.requestId);

Review Comment:
   Include additional context ... something like:
   
   ```suggestion
         if (logger.isTraceEnabled()) {
           logger.trace("Finalize shuffle req from {} for rpc request {}",
                   getRemoteAddress(channelHandlerContext.channel()), req.rpcRequest.requestId);
         }
   ```



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleTransportContext.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.MessageToMessageDecoder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.TransportContext;
+import org.apache.spark.network.protocol.Message;
+import org.apache.spark.network.protocol.MessageDecoder;
+import org.apache.spark.network.protocol.RpcRequest;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.server.TransportChannelHandler;
+import org.apache.spark.network.server.TransportRequestHandler;
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
+import org.apache.spark.network.util.IOMode;
+import org.apache.spark.network.util.NettyUtils;
+import org.apache.spark.network.util.TransportConf;
+
+public class ShuffleTransportContext extends TransportContext {

Review Comment:
   Class javadoc



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleTransportContext.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.MessageToMessageDecoder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.TransportContext;
+import org.apache.spark.network.protocol.Message;
+import org.apache.spark.network.protocol.MessageDecoder;
+import org.apache.spark.network.protocol.RpcRequest;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.server.TransportChannelHandler;
+import org.apache.spark.network.server.TransportRequestHandler;
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
+import org.apache.spark.network.util.IOMode;
+import org.apache.spark.network.util.NettyUtils;
+import org.apache.spark.network.util.TransportConf;
+
+public class ShuffleTransportContext extends TransportContext {
+  private static final Logger logger = LoggerFactory.getLogger(ShuffleTransportContext.class);
+  private final EventLoopGroup finalizeWorkers;
+
+  public ShuffleTransportContext(
+    TransportConf conf,
+    ExternalBlockHandler rpcHandler,
+    boolean closeIdleConnections) {
+    this(conf, rpcHandler, closeIdleConnections, false);
+  }
+
+  public ShuffleTransportContext(TransportConf conf,
+      RpcHandler rpcHandler,
+      boolean closeIdleConnections,
+      boolean isClientOnly) {
+    super(conf, rpcHandler, closeIdleConnections, isClientOnly);
+
+    if (conf.getModuleName() != null &&
+      conf.getModuleName().equalsIgnoreCase("shuffle") && conf.separateFinalizeShuffleMerge()) {
+      finalizeWorkers = NettyUtils.createEventLoop(
+        IOMode.valueOf(conf.ioMode()),
+        conf.finalizeShuffleMergeHandlerThreads(),
+        "shuffle-finalize-merge-handler");
+      logger.info("finalize shuffle merged workers created");
+    } else {
+      finalizeWorkers = null;
+    }
+  }
+
+  @Override
+  public TransportChannelHandler initializePipeline(SocketChannel channel) {
+    TransportChannelHandler ch = super.initializePipeline(channel);
+    addFinalizeHandlerToPipelineIfNeeded(channel, ch);
+    return ch;
+  }
+
+  @Override
+  public TransportChannelHandler initializePipeline(SocketChannel channel,
+      RpcHandler channelRpcHandler) {
+    TransportChannelHandler ch = super.initializePipeline(channel, channelRpcHandler);
+    addFinalizeHandlerToPipelineIfNeeded(channel, ch);
+    return ch;
+  }
+
+  /**
+   * Add finalize handler to pipeline if needed. This is needed only when separateFinalizeShuffleMerge
+   * is enabled.
+   */
+  private void addFinalizeHandlerToPipelineIfNeeded(SocketChannel channel,

Review Comment:
   Rename this to `addHandlersToPipeline` or some such.
   Currently, this is limited to finalization, but can have other expensive IO operations in future as well.



##########
common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java:
##########
@@ -324,6 +326,33 @@ public boolean separateChunkFetchRequest() {
     return conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0) > 0;
   }
 
+  /**
+   * Percentage of io.serverThreads used by netty to process FinalizeShuffleMerge. When the config
+   * `spark.shuffle.server.finalizeShuffleMergeThreadsPercent` is set, shuffle server will use a
+   * separate EventLoopGroup to process FinalizeShuffleMerge messages, which are I/O intensive and
+   * could take long time to process due to disk contentions. The number of threads used for handling
+   * finalizeShuffleMerge requests are percentage of io.serverThreads (if defined) else it is a
+   * percentage of 2 * #cores.
+   */
+  public int finalizeShuffleMergeHandlerThreads() {
+    if (!this.getModuleName().equalsIgnoreCase("shuffle")) {
+      return 0;
+    }
+    int finalizeShuffleMergeThreadsPercent =
+        Integer.parseInt(conf.get("spark.shuffle.server.finalizeShuffleMergeThreadsPercent"));

Review Comment:
   Currently, this method should not be called when `spark.shuffle.server.finalizeShuffleMergeThreadsPercent` is not specified - but that is relying on current code flow in order for it to hold, which is brittle as project evolved.
   
   Instead, handle explicitly this contract: 
   a) Precondition check for `spark.shuffle.server.finalizeShuffleMergeThreadsPercent` to be present, or 
   b) (more preferably) Handle the case when `spark.shuffle.server.finalizeShuffleMergeThreadsPercent` is missing.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org