You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "otterc (via GitHub)" <gi...@apache.org> on 2023/08/09 04:28:08 UTC

[GitHub] [spark] otterc commented on a diff in pull request #41489: [SPARK-43987][Shuffle] Separate finalizeShuffleMerge Processing to Dedicated Thread Pools

otterc commented on code in PR #41489:
URL: https://github.com/apache/spark/pull/41489#discussion_r1287913820


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleTransportContext.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.MessageToMessageDecoder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.TransportContext;
+import org.apache.spark.network.protocol.Message;
+import org.apache.spark.network.protocol.MessageDecoder;
+import org.apache.spark.network.protocol.RpcRequest;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.server.TransportChannelHandler;
+import org.apache.spark.network.server.TransportRequestHandler;
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
+import org.apache.spark.network.util.IOMode;
+import org.apache.spark.network.util.NettyUtils;
+import org.apache.spark.network.util.TransportConf;
+
+import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
+
+/**
+ * Extends {@link TransportContext} to support customized shuffle service. Specifically, we
+ * modified the Netty Channel Pipeline so that IO expensive messages such as FINALIZE_SHUFFLE_MERGE
+ * are processed in the separate handlers.
+ * */
+public class ShuffleTransportContext extends TransportContext {
+  private static final Logger logger = LoggerFactory.getLogger(ShuffleTransportContext.class);
+  private static final ShuffleMessageDecoder SHUFFLE_DECODER =
+      new ShuffleMessageDecoder(MessageDecoder.INSTANCE);
+  private final EventLoopGroup finalizeWorkers;
+
+  public ShuffleTransportContext(
+    TransportConf conf,
+    ExternalBlockHandler rpcHandler,
+    boolean closeIdleConnections) {
+    this(conf, rpcHandler, closeIdleConnections, false);
+  }
+
+  public ShuffleTransportContext(TransportConf conf,
+      RpcHandler rpcHandler,
+      boolean closeIdleConnections,
+      boolean isClientOnly) {
+    super(conf, rpcHandler, closeIdleConnections, isClientOnly);
+
+    if ("shuffle".equalsIgnoreCase(conf.getModuleName()) && conf.separateFinalizeShuffleMerge()) {
+      finalizeWorkers = NettyUtils.createEventLoop(
+        IOMode.valueOf(conf.ioMode()),

Review Comment:
   Nit: indentation



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleTransportContext.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.MessageToMessageDecoder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.TransportContext;
+import org.apache.spark.network.protocol.Message;
+import org.apache.spark.network.protocol.MessageDecoder;
+import org.apache.spark.network.protocol.RpcRequest;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.server.TransportChannelHandler;
+import org.apache.spark.network.server.TransportRequestHandler;
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
+import org.apache.spark.network.util.IOMode;
+import org.apache.spark.network.util.NettyUtils;
+import org.apache.spark.network.util.TransportConf;
+
+import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
+
+/**
+ * Extends {@link TransportContext} to support customized shuffle service. Specifically, we
+ * modified the Netty Channel Pipeline so that IO expensive messages such as FINALIZE_SHUFFLE_MERGE
+ * are processed in the separate handlers.
+ * */
+public class ShuffleTransportContext extends TransportContext {
+  private static final Logger logger = LoggerFactory.getLogger(ShuffleTransportContext.class);
+  private static final ShuffleMessageDecoder SHUFFLE_DECODER =
+      new ShuffleMessageDecoder(MessageDecoder.INSTANCE);
+  private final EventLoopGroup finalizeWorkers;
+
+  public ShuffleTransportContext(
+    TransportConf conf,
+    ExternalBlockHandler rpcHandler,
+    boolean closeIdleConnections) {
+    this(conf, rpcHandler, closeIdleConnections, false);
+  }
+
+  public ShuffleTransportContext(TransportConf conf,
+      RpcHandler rpcHandler,
+      boolean closeIdleConnections,
+      boolean isClientOnly) {
+    super(conf, rpcHandler, closeIdleConnections, isClientOnly);
+
+    if ("shuffle".equalsIgnoreCase(conf.getModuleName()) && conf.separateFinalizeShuffleMerge()) {
+      finalizeWorkers = NettyUtils.createEventLoop(
+        IOMode.valueOf(conf.ioMode()),
+        conf.finalizeShuffleMergeHandlerThreads(),
+        "shuffle-finalize-merge-handler");
+      logger.info("finalize shuffle merged workers created");
+    } else {
+      finalizeWorkers = null;
+    }
+  }
+
+  @Override
+  public TransportChannelHandler initializePipeline(SocketChannel channel) {
+    TransportChannelHandler ch = super.initializePipeline(channel);
+    addHandlerToPipeline(channel, ch);
+    return ch;
+  }
+
+  @Override
+  public TransportChannelHandler initializePipeline(SocketChannel channel,
+      RpcHandler channelRpcHandler) {
+    TransportChannelHandler ch = super.initializePipeline(channel, channelRpcHandler);
+    addHandlerToPipeline(channel, ch);
+    return ch;
+  }
+
+  /**
+   * Add finalize handler to pipeline if needed. This is needed only when
+   * separateFinalizeShuffleMerge is enabled.
+   */
+  private void addHandlerToPipeline(SocketChannel channel,
+      TransportChannelHandler ch) {

Review Comment:
   Nit: rename `ch` to `transportChannelHandler`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org