You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/03/11 22:16:37 UTC

[spark] branch master updated: [SPARK-27073][CORE] Fix a race condition when handling of IdleStateEvent

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 064604a  [SPARK-27073][CORE] Fix a race condition when handling of IdleStateEvent
064604a is described below

commit 064604aaa768de4c33425249be9c73948b2aeac9
Author: sychen <sy...@ctrip.com>
AuthorDate: Mon Mar 11 15:16:16 2019 -0700

    [SPARK-27073][CORE] Fix a race condition when handling of IdleStateEvent
    
    ## What changes were proposed in this pull request?
    
    When TransportChannelHandler processes IdleStateEvent, it first calculates whether the last request time has timed out.
    At this time, TransportClient.sendRpc initiates a request.
    TransportChannelHandler gets responseHandler.numOutstandingRequests() > 0, causing the normal connection to be closed.
    
    ## How was this patch tested?
    
    Closes #23989 from cxzl25/fix_IdleStateEvent_timeout.
    
    Authored-by: sychen <sy...@ctrip.com>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 .../java/org/apache/spark/network/client/TransportResponseHandler.java | 2 +-
 .../java/org/apache/spark/network/server/TransportChannelHandler.java  | 3 ++-
 2 files changed, 3 insertions(+), 2 deletions(-)

diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
index 596b0ea..2f143f7 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
@@ -91,7 +91,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
   }
 
   public void addStreamCallback(String streamId, StreamCallback callback) {
-    timeOfLastRequestNs.set(System.nanoTime());
+    updateTimeOfLastRequest();
     streamCallbacks.offer(ImmutablePair.of(streamId, callback));
   }
 
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
index ca81099..31371f6 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
@@ -155,10 +155,11 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler<Message
       // To avoid a race between TransportClientFactory.createClient() and this code which could
       // result in an inactive client being returned, this needs to run in a synchronized block.
       synchronized (this) {
+        boolean hasInFlightRequests = responseHandler.numOutstandingRequests() > 0;
         boolean isActuallyOverdue =
           System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > requestTimeoutNs;
         if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) {
-          if (responseHandler.numOutstandingRequests() > 0) {
+          if (hasInFlightRequests) {
             String address = getRemoteAddress(ctx.channel());
             logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
               "requests. Assuming connection is dead; please adjust spark.network.timeout if " +


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org