You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2020/11/18 02:33:48 UTC
[spark] branch master updated: [SPARK-31069][CORE] Avoid repeat
compute `chunksBeingTransferred` cause hight cpu cost in external shuffle
service when `maxChunksBeingTransferred` use default value
This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f8b95dd [SPARK-31069][CORE] Avoid repeat compute `chunksBeingTransferred` cause hight cpu cost in external shuffle service when `maxChunksBeingTransferred` use default value
f8b95dd is described below
commit f8b95dddc1571194fd728d7e0c6de495895da99e
Author: angerszhu <an...@gmail.com>
AuthorDate: Tue Nov 17 20:32:20 2020 -0600
[SPARK-31069][CORE] Avoid repeat compute `chunksBeingTransferred` cause hight cpu cost in external shuffle service when `maxChunksBeingTransferred` use default value
### What changes were proposed in this pull request?
Followup from #27831 , origin author chrysan.
Each request it will check `chunksBeingTransferred `
```
public long chunksBeingTransferred() {
long sum = 0L;
for (StreamState streamState: streams.values()) {
sum += streamState.chunksBeingTransferred.get();
}
return sum;
}
```
such as
```
long chunksBeingTransferred = streamManager.chunksBeingTransferred();
if (chunksBeingTransferred >= maxChunksBeingTransferred) {
logger.warn("The number of chunks being transferred {} is above {}, close the connection.",
chunksBeingTransferred, maxChunksBeingTransferred);
channel.close();
return;
}
```
It will traverse `streams` repeatedly and we know that fetch data chunk will access `stream` too, there cause two problem:
1. repeated traverse `streams`, the longer the length, the longer the time
2. lock race in ConcurrentHashMap `streams`
In this PR, when `maxChunksBeingTransferred` use default value, we avoid compute `chunksBeingTransferred ` since we don't care about this. If user want to set this configuration and meet performance problem, you can also backport PR #27831
### Why are the changes needed?
Speed up getting `chunksBeingTransferred` and avoid lock race in object `streams`
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existed UT
Closes #30139 from AngersZhuuuu/SPARK-31069.
Lead-authored-by: angerszhu <an...@gmail.com>
Co-authored-by: chrysan <ch...@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../spark/network/server/ChunkFetchRequestHandler.java | 14 ++++++++------
.../spark/network/server/TransportRequestHandler.java | 14 ++++++++------
2 files changed, 16 insertions(+), 12 deletions(-)
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
index 82810da..9a71cf5 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
@@ -88,12 +88,14 @@ public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler<ChunkF
logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel),
msg.streamChunkId);
}
- long chunksBeingTransferred = streamManager.chunksBeingTransferred();
- if (chunksBeingTransferred >= maxChunksBeingTransferred) {
- logger.warn("The number of chunks being transferred {} is above {}, close the connection.",
- chunksBeingTransferred, maxChunksBeingTransferred);
- channel.close();
- return;
+ if (maxChunksBeingTransferred < Long.MAX_VALUE) {
+ long chunksBeingTransferred = streamManager.chunksBeingTransferred();
+ if (chunksBeingTransferred >= maxChunksBeingTransferred) {
+ logger.warn("The number of chunks being transferred {} is above {}, close the connection.",
+ chunksBeingTransferred, maxChunksBeingTransferred);
+ channel.close();
+ return;
+ }
}
ManagedBuffer buf;
try {
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
index f178928..4a30f8d 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
@@ -124,12 +124,14 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
req.streamId);
}
- long chunksBeingTransferred = streamManager.chunksBeingTransferred();
- if (chunksBeingTransferred >= maxChunksBeingTransferred) {
- logger.warn("The number of chunks being transferred {} is above {}, close the connection.",
- chunksBeingTransferred, maxChunksBeingTransferred);
- channel.close();
- return;
+ if (maxChunksBeingTransferred < Long.MAX_VALUE) {
+ long chunksBeingTransferred = streamManager.chunksBeingTransferred();
+ if (chunksBeingTransferred >= maxChunksBeingTransferred) {
+ logger.warn("The number of chunks being transferred {} is above {}, close the connection.",
+ chunksBeingTransferred, maxChunksBeingTransferred);
+ channel.close();
+ return;
+ }
}
ManagedBuffer buf;
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org