You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2022/01/11 05:05:21 UTC

[spark] branch master updated: [SPARK-37847][CORE][SHUFFLE] PushBlockStreamCallback#isStale should check null to avoid NPE

This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7463564  [SPARK-37847][CORE][SHUFFLE] PushBlockStreamCallback#isStale should check null to avoid NPE
7463564 is described below

commit 74635649b230fd76199efe3da4bd3e4112894c4a
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Mon Jan 10 23:04:39 2022 -0600

    [SPARK-37847][CORE][SHUFFLE] PushBlockStreamCallback#isStale should check null to avoid NPE
    
    ### What changes were proposed in this pull request?
    
    Check `null` in `isStale` to avoid NPE.
    
    ### Why are the changes needed?
    
    There is a chance that late push shuffle block request invokes `PushBlockStreamCallback#onData` after the merged partition finalized, which causes NPE.
    
    ```
    2022-01-07 21:06:14,464 INFO shuffle.RemoteBlockPushResolver: shuffle partition application_1640143179334_0149_-1 102 6922, chunk_size=1, meta_length=138, data_length=112632
    2022-01-07 21:06:14,615 ERROR shuffle.RemoteBlockPushResolver: Encountered issue when merging shufflePush_102_0_279_6922
    java.lang.NullPointerException
            at org.apache.spark.network.shuffle.RemoteBlockPushResolver$AppShuffleMergePartitionsInfo.access$200(RemoteBlockPushResolver.java:1017)
            at org.apache.spark.network.shuffle.RemoteBlockPushResolver$PushBlockStreamCallback.isStale(RemoteBlockPushResolver.java:806)
            at org.apache.spark.network.shuffle.RemoteBlockPushResolver$PushBlockStreamCallback.onData(RemoteBlockPushResolver.java:840)
            at org.apache.spark.network.server.TransportRequestHandler$3.onData(TransportRequestHandler.java:209)
            at org.apache.spark.network.client.StreamInterceptor.handle(StreamInterceptor.java:79)
            at org.apache.spark.network.util.TransportFrameDecoder.feedInterceptor(TransportFrameDecoder.java:263)
            at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:87)
            at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
            at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
            at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
            at org.sparkproject.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
            at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
            at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
            at org.sparkproject.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
            at org.sparkproject.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
            at org.sparkproject.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
            at org.sparkproject.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
            at org.sparkproject.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
            at org.sparkproject.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
            at org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
            at org.sparkproject.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
            at org.sparkproject.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
            at java.lang.Thread.run(Thread.java:748)
    ```
    
    `isTooLate` checks null but `isStale` does not, so check `isTooLate` first to avoid NPE
    ```java
       private boolean isTooLate(
            AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo,
            int reduceId) {
          return null == appShuffleMergePartitionsInfo ||
            INDETERMINATE_SHUFFLE_FINALIZED == appShuffleMergePartitionsInfo.shuffleMergePartitions ||
            !appShuffleMergePartitionsInfo.shuffleMergePartitions.containsKey(reduceId);
        }
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    Bugfix, to avoid NPE in Yarn ESS.
    
    ### How was this patch tested?
    I don't think it's easy to write a unit test for this issue based on current code, since it's a minor change, use exsiting ut to ensue the change doesn't break the current functionalities.
    
    Closes #35146 from pan3793/SPARK-37847.
    
    Authored-by: Cheng Pan <ch...@apache.org>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../org/apache/spark/network/shuffle/RemoteBlockPushResolver.java    | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index d0eb4ae..d626cc3 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -789,13 +789,14 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     }
 
     /**
-     * If appShuffleMergePartitionsInfo's shuffleMergeId is
+     * If appShuffleMergePartitionsInfo is null or shuffleMergeId is
      * greater than the request shuffleMergeId then it is a stale block push.
      */
     private boolean isStale(
         AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo,
         int shuffleMergeId) {
-      return appShuffleMergePartitionsInfo.shuffleMergeId > shuffleMergeId;
+      return null == appShuffleMergePartitionsInfo ||
+        appShuffleMergePartitionsInfo.shuffleMergeId > shuffleMergeId;
     }
 
     /**

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org