You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2020/11/18 02:33:48 UTC

[spark] branch master updated: [SPARK-31069][CORE] Avoid repeat compute `chunksBeingTransferred` cause hight cpu cost in external shuffle service when `maxChunksBeingTransferred` use default value

This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f8b95dd  [SPARK-31069][CORE] Avoid repeat compute `chunksBeingTransferred` cause hight cpu cost in external shuffle service when  `maxChunksBeingTransferred`  use default value
f8b95dd is described below

commit f8b95dddc1571194fd728d7e0c6de495895da99e
Author: angerszhu <an...@gmail.com>
AuthorDate: Tue Nov 17 20:32:20 2020 -0600

    [SPARK-31069][CORE] Avoid repeat compute `chunksBeingTransferred` cause hight cpu cost in external shuffle service when  `maxChunksBeingTransferred`  use default value
    
    ### What changes were proposed in this pull request?
    Followup from #27831 , origin author chrysan.
    
    Each request it will check `chunksBeingTransferred `
    ```
    public long chunksBeingTransferred() {
        long sum = 0L;
        for (StreamState streamState: streams.values()) {
          sum += streamState.chunksBeingTransferred.get();
        }
        return sum;
      }
    ```
      such as
    ```
    long chunksBeingTransferred = streamManager.chunksBeingTransferred();
        if (chunksBeingTransferred >= maxChunksBeingTransferred) {
          logger.warn("The number of chunks being transferred {} is above {}, close the connection.",
            chunksBeingTransferred, maxChunksBeingTransferred);
          channel.close();
          return;
        }
    ```
    It will  traverse `streams` repeatedly and we know that fetch data chunk will access `stream` too,  there cause two problem:
    
    1. repeated traverse `streams`, the longer the length, the longer the time
    2. lock race in ConcurrentHashMap `streams`
    
    In this PR, when `maxChunksBeingTransferred` use default value, we avoid compute `chunksBeingTransferred ` since we don't  care about this.  If user want to set this configuration and meet performance problem,  you can also backport PR #27831
    
    ### Why are the changes needed?
    Speed up  getting `chunksBeingTransferred`  and avoid lock race in object `streams`
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existed UT
    
    Closes #30139 from AngersZhuuuu/SPARK-31069.
    
    Lead-authored-by: angerszhu <an...@gmail.com>
    Co-authored-by: chrysan <ch...@gmail.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../spark/network/server/ChunkFetchRequestHandler.java     | 14 ++++++++------
 .../spark/network/server/TransportRequestHandler.java      | 14 ++++++++------
 2 files changed, 16 insertions(+), 12 deletions(-)

diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
index 82810da..9a71cf5 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
@@ -88,12 +88,14 @@ public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler<ChunkF
       logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel),
         msg.streamChunkId);
     }
-    long chunksBeingTransferred = streamManager.chunksBeingTransferred();
-    if (chunksBeingTransferred >= maxChunksBeingTransferred) {
-      logger.warn("The number of chunks being transferred {} is above {}, close the connection.",
-        chunksBeingTransferred, maxChunksBeingTransferred);
-      channel.close();
-      return;
+    if (maxChunksBeingTransferred < Long.MAX_VALUE) {
+      long chunksBeingTransferred = streamManager.chunksBeingTransferred();
+      if (chunksBeingTransferred >= maxChunksBeingTransferred) {
+        logger.warn("The number of chunks being transferred {} is above {}, close the connection.",
+          chunksBeingTransferred, maxChunksBeingTransferred);
+        channel.close();
+        return;
+      }
     }
     ManagedBuffer buf;
     try {
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
index f178928..4a30f8d 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
@@ -124,12 +124,14 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
         req.streamId);
     }
 
-    long chunksBeingTransferred = streamManager.chunksBeingTransferred();
-    if (chunksBeingTransferred >= maxChunksBeingTransferred) {
-      logger.warn("The number of chunks being transferred {} is above {}, close the connection.",
-        chunksBeingTransferred, maxChunksBeingTransferred);
-      channel.close();
-      return;
+    if (maxChunksBeingTransferred < Long.MAX_VALUE) {
+      long chunksBeingTransferred = streamManager.chunksBeingTransferred();
+      if (chunksBeingTransferred >= maxChunksBeingTransferred) {
+        logger.warn("The number of chunks being transferred {} is above {}, close the connection.",
+          chunksBeingTransferred, maxChunksBeingTransferred);
+        channel.close();
+        return;
+      }
     }
     ManagedBuffer buf;
     try {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org