You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/03/26 04:51:56 UTC
[spark] branch branch-3.0 updated: [SPARK-30623][CORE] Spark
external shuffle allow disable of separate event loop group
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new a9a185c [SPARK-30623][CORE] Spark external shuffle allow disable of separate event loop group
a9a185c is described below
commit a9a185c918678285838599b6d05087212d727c06
Author: Yuanjian Li <xy...@gmail.com>
AuthorDate: Thu Mar 26 12:37:48 2020 +0800
[SPARK-30623][CORE] Spark external shuffle allow disable of separate event loop group
### What changes were proposed in this pull request?
Fix the regression caused by #22173.
The original PR changes the logic of handling `ChunkFetchReqeust` from async to sync, that's causes the shuffle benchmark regression. This PR fixes the regression back to the async mode by reusing the config `spark.shuffle.server.chunkFetchHandlerThreadsPercent`.
When the user sets the config, ChunkFetchReqeust will be processed in a separate event loop group, otherwise, the code path is exactly the same as before.
### Why are the changes needed?
Fix the shuffle performance regression described in https://github.com/apache/spark/pull/22173#issuecomment-572459561
### Does this PR introduce any user-facing change?
Yes, this PR disable the separate event loop for FetchRequest by default.
### How was this patch tested?
Existing UT.
Closes #27665 from xuanyuanking/SPARK-24355-follow.
Authored-by: Yuanjian Li <xy...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 0fe203e7032a326ff0c78f6660ab1b09409fe09d)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../org/apache/spark/network/TransportContext.java | 27 +++++++++---------
.../network/server/ChunkFetchRequestHandler.java | 33 ++++++++++++++++------
.../network/server/TransportChannelHandler.java | 5 +++-
.../network/server/TransportRequestHandler.java | 13 +++++++--
.../apache/spark/network/util/TransportConf.java | 13 +++++++--
.../network/ChunkFetchRequestHandlerSuite.java | 4 +--
.../network/TransportRequestHandlerSuite.java | 4 +--
7 files changed, 66 insertions(+), 33 deletions(-)
diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
index d99b9bd..a0de9df 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
@@ -123,7 +123,7 @@ public class TransportContext implements Closeable {
if (conf.getModuleName() != null &&
conf.getModuleName().equalsIgnoreCase("shuffle") &&
- !isClientOnly) {
+ !isClientOnly && conf.separateChunkFetchRequest()) {
chunkFetchWorkers = NettyUtils.createEventLoop(
IOMode.valueOf(conf.ioMode()),
conf.chunkFetchHandlerThreads(),
@@ -187,8 +187,6 @@ public class TransportContext implements Closeable {
RpcHandler channelRpcHandler) {
try {
TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
- ChunkFetchRequestHandler chunkFetchHandler =
- createChunkFetchHandler(channelHandler, channelRpcHandler);
ChannelPipeline pipeline = channel.pipeline()
.addLast("encoder", ENCODER)
.addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
@@ -200,6 +198,9 @@ public class TransportContext implements Closeable {
.addLast("handler", channelHandler);
// Use a separate EventLoopGroup to handle ChunkFetchRequest messages for shuffle rpcs.
if (chunkFetchWorkers != null) {
+ ChunkFetchRequestHandler chunkFetchHandler = new ChunkFetchRequestHandler(
+ channelHandler.getClient(), rpcHandler.getStreamManager(),
+ conf.maxChunksBeingTransferred(), true /* syncModeEnabled */);
pipeline.addLast(chunkFetchWorkers, "chunkFetchHandler", chunkFetchHandler);
}
return channelHandler;
@@ -217,19 +218,17 @@ public class TransportContext implements Closeable {
private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
TransportClient client = new TransportClient(channel, responseHandler);
+ boolean separateChunkFetchRequest = conf.separateChunkFetchRequest();
+ ChunkFetchRequestHandler chunkFetchRequestHandler = null;
+ if (!separateChunkFetchRequest) {
+ chunkFetchRequestHandler = new ChunkFetchRequestHandler(
+ client, rpcHandler.getStreamManager(),
+ conf.maxChunksBeingTransferred(), false /* syncModeEnabled */);
+ }
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
- rpcHandler, conf.maxChunksBeingTransferred());
+ rpcHandler, conf.maxChunksBeingTransferred(), chunkFetchRequestHandler);
return new TransportChannelHandler(client, responseHandler, requestHandler,
- conf.connectionTimeoutMs(), closeIdleConnections, this);
- }
-
- /**
- * Creates the dedicated ChannelHandler for ChunkFetchRequest messages.
- */
- private ChunkFetchRequestHandler createChunkFetchHandler(TransportChannelHandler channelHandler,
- RpcHandler rpcHandler) {
- return new ChunkFetchRequestHandler(channelHandler.getClient(),
- rpcHandler.getStreamManager(), conf.maxChunksBeingTransferred());
+ conf.connectionTimeoutMs(), separateChunkFetchRequest, closeIdleConnections, this);
}
public TransportConf getConf() { return conf; }
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
index 94412c4..82810da 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
@@ -55,14 +55,17 @@ public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler<ChunkF
private final StreamManager streamManager;
/** The max number of chunks being transferred and not finished yet. */
private final long maxChunksBeingTransferred;
+ private final boolean syncModeEnabled;
public ChunkFetchRequestHandler(
TransportClient client,
StreamManager streamManager,
- Long maxChunksBeingTransferred) {
+ Long maxChunksBeingTransferred,
+ boolean syncModeEnabled) {
this.client = client;
this.streamManager = streamManager;
this.maxChunksBeingTransferred = maxChunksBeingTransferred;
+ this.syncModeEnabled = syncModeEnabled;
}
@Override
@@ -76,6 +79,11 @@ public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler<ChunkF
ChannelHandlerContext ctx,
final ChunkFetchRequest msg) throws Exception {
Channel channel = ctx.channel();
+ processFetchRequest(channel, msg);
+ }
+
+ public void processFetchRequest(
+ final Channel channel, final ChunkFetchRequest msg) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel),
msg.streamChunkId);
@@ -112,19 +120,26 @@ public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler<ChunkF
* channel will be handled by the EventLoop the channel is registered to. So even
* though we are processing the ChunkFetchRequest in a separate thread pool, the actual I/O,
* which is the potentially blocking call that could deplete server handler threads, is still
- * being processed by TransportServer's default EventLoopGroup. In order to throttle the max
- * number of threads that channel I/O for sending response to ChunkFetchRequest, the thread
- * calling channel.writeAndFlush will wait for the completion of sending response back to
- * client by invoking await(). This will throttle the rate at which threads from
- * ChunkFetchRequest dedicated EventLoopGroup submit channel I/O requests to TransportServer's
- * default EventLoopGroup, thus making sure that we can reserve some threads in
- * TransportServer's default EventLoopGroup for handling other RPC messages.
+ * being processed by TransportServer's default EventLoopGroup.
+ *
+ * When syncModeEnabled is true, Spark will throttle the max number of threads that channel I/O
+ * for sending response to ChunkFetchRequest, the thread calling channel.writeAndFlush will wait
+ * for the completion of sending response back to client by invoking await(). This will throttle
+ * the rate at which threads from ChunkFetchRequest dedicated EventLoopGroup submit channel I/O
+ * requests to TransportServer's default EventLoopGroup, thus making sure that we can reserve
+ * some threads in TransportServer's default EventLoopGroup for handling other RPC messages.
*/
private ChannelFuture respond(
final Channel channel,
final Encodable result) throws InterruptedException {
final SocketAddress remoteAddress = channel.remoteAddress();
- return channel.writeAndFlush(result).await().addListener((ChannelFutureListener) future -> {
+ ChannelFuture channelFuture;
+ if (syncModeEnabled) {
+ channelFuture = channel.writeAndFlush(result).await();
+ } else {
+ channelFuture = channel.writeAndFlush(result);
+ }
+ return channelFuture.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
logger.trace("Sent result {} to client {}", result, remoteAddress);
} else {
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
index 31371f6..e53a0c1 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
@@ -58,6 +58,7 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler<Message
private final TransportRequestHandler requestHandler;
private final long requestTimeoutNs;
private final boolean closeIdleConnections;
+ private final boolean skipChunkFetchRequest;
private final TransportContext transportContext;
public TransportChannelHandler(
@@ -65,12 +66,14 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler<Message
TransportResponseHandler responseHandler,
TransportRequestHandler requestHandler,
long requestTimeoutMs,
+ boolean skipChunkFetchRequest,
boolean closeIdleConnections,
TransportContext transportContext) {
this.client = client;
this.responseHandler = responseHandler;
this.requestHandler = requestHandler;
this.requestTimeoutNs = requestTimeoutMs * 1000L * 1000;
+ this.skipChunkFetchRequest = skipChunkFetchRequest;
this.closeIdleConnections = closeIdleConnections;
this.transportContext = transportContext;
}
@@ -124,7 +127,7 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler<Message
*/
@Override
public boolean acceptInboundMessage(Object msg) throws Exception {
- if (msg instanceof ChunkFetchRequest) {
+ if (skipChunkFetchRequest && msg instanceof ChunkFetchRequest) {
return false;
} else {
return super.acceptInboundMessage(msg);
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
index 0792b58..f178928 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
@@ -62,16 +62,21 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
/** The max number of chunks being transferred and not finished yet. */
private final long maxChunksBeingTransferred;
+ /** The dedicated ChannelHandler for ChunkFetchRequest messages. */
+ private final ChunkFetchRequestHandler chunkFetchRequestHandler;
+
public TransportRequestHandler(
Channel channel,
TransportClient reverseClient,
RpcHandler rpcHandler,
- Long maxChunksBeingTransferred) {
+ Long maxChunksBeingTransferred,
+ ChunkFetchRequestHandler chunkFetchRequestHandler) {
this.channel = channel;
this.reverseClient = reverseClient;
this.rpcHandler = rpcHandler;
this.streamManager = rpcHandler.getStreamManager();
this.maxChunksBeingTransferred = maxChunksBeingTransferred;
+ this.chunkFetchRequestHandler = chunkFetchRequestHandler;
}
@Override
@@ -97,8 +102,10 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
}
@Override
- public void handle(RequestMessage request) {
- if (request instanceof RpcRequest) {
+ public void handle(RequestMessage request) throws Exception {
+ if (request instanceof ChunkFetchRequest) {
+ chunkFetchRequestHandler.processFetchRequest(channel, (ChunkFetchRequest) request);
+ } else if (request instanceof RpcRequest) {
processRpcRequest((RpcRequest) request);
} else if (request instanceof OneWayMessage) {
processOneWayMessage((OneWayMessage) request);
diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index cc0f291..6c37f9a 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -316,7 +316,8 @@ public class TransportConf {
/**
* Percentage of io.serverThreads used by netty to process ChunkFetchRequest.
- * Shuffle server will use a separate EventLoopGroup to process ChunkFetchRequest messages.
+ * When the config `spark.shuffle.server.chunkFetchHandlerThreadsPercent` is set,
+ * shuffle server will use a separate EventLoopGroup to process ChunkFetchRequest messages.
* Although when calling the async writeAndFlush on the underlying channel to send
* response back to client, the I/O on the channel is still being handled by
* {@link org.apache.spark.network.server.TransportServer}'s default EventLoopGroup
@@ -339,13 +340,21 @@ public class TransportConf {
return 0;
}
int chunkFetchHandlerThreadsPercent =
- conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 100);
+ Integer.parseInt(conf.get("spark.shuffle.server.chunkFetchHandlerThreadsPercent"));
int threads =
this.serverThreads() > 0 ? this.serverThreads() : 2 * NettyRuntime.availableProcessors();
return (int) Math.ceil(threads * (chunkFetchHandlerThreadsPercent / 100.0));
}
/**
+ * Whether to use a separate EventLoopGroup to process ChunkFetchRequest messages, it is decided
+ * by the config `spark.shuffle.server.chunkFetchHandlerThreadsPercent` is set or not.
+ */
+ public boolean separateChunkFetchRequest() {
+ return conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0) > 0;
+ }
+
+ /**
* Whether to use the old protocol while doing the shuffle block fetching.
* It is only enabled while we need the compatibility in the scenario of new spark version
* job fetching blocks from old version external shuffle service.
diff --git a/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchRequestHandlerSuite.java b/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchRequestHandlerSuite.java
index 7e30ed4..addb4ff 100644
--- a/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchRequestHandlerSuite.java
+++ b/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchRequestHandlerSuite.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.List;
import io.netty.channel.Channel;
-import org.apache.spark.network.server.ChunkFetchRequestHandler;
import org.junit.Assert;
import org.junit.Test;
@@ -33,6 +32,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.protocol.*;
+import org.apache.spark.network.server.ChunkFetchRequestHandler;
import org.apache.spark.network.server.NoOpRpcHandler;
import org.apache.spark.network.server.OneForOneStreamManager;
import org.apache.spark.network.server.RpcHandler;
@@ -68,7 +68,7 @@ public class ChunkFetchRequestHandlerSuite {
long streamId = streamManager.registerStream("test-app", managedBuffers.iterator(), channel);
TransportClient reverseClient = mock(TransportClient.class);
ChunkFetchRequestHandler requestHandler = new ChunkFetchRequestHandler(reverseClient,
- rpcHandler.getStreamManager(), 2L);
+ rpcHandler.getStreamManager(), 2L, false);
RequestMessage request0 = new ChunkFetchRequest(new StreamChunkId(streamId, 0));
requestHandler.channelRead(context, request0);
diff --git a/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java b/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java
index a43a659..0a64471 100644
--- a/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java
+++ b/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java
@@ -39,7 +39,7 @@ import org.apache.spark.network.server.TransportRequestHandler;
public class TransportRequestHandlerSuite {
@Test
- public void handleStreamRequest() {
+ public void handleStreamRequest() throws Exception {
RpcHandler rpcHandler = new NoOpRpcHandler();
OneForOneStreamManager streamManager = (OneForOneStreamManager) (rpcHandler.getStreamManager());
Channel channel = mock(Channel.class);
@@ -66,7 +66,7 @@ public class TransportRequestHandlerSuite {
TransportClient reverseClient = mock(TransportClient.class);
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, reverseClient,
- rpcHandler, 2L);
+ rpcHandler, 2L, null);
RequestMessage request0 = new StreamRequest(String.format("%d_%d", streamId, 0));
requestHandler.handle(request0);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org