You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by wu...@apache.org on 2021/08/05 07:33:58 UTC

[spark] branch branch-3.2 updated: [SPARK-36391][SHUFFLE] When state is remove will throw NPE, and we should improve the error message

This is an automated email from the ASF dual-hosted git repository.

wuyi pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new b4c065b  [SPARK-36391][SHUFFLE] When state is remove will throw NPE, and we should improve the error message
b4c065b is described below

commit b4c065b7dbd457f85003d3658b17bb254f88577b
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Thu Aug 5 15:30:00 2021 +0800

    [SPARK-36391][SHUFFLE] When state is remove will throw NPE, and we should improve the error message
    
    ### What changes were proposed in this pull request?
    When channel terminated will call `connectionTerminated` and remove corresponding StreamState,
    then all coming request on this StreamState will throw NPE like
    ```
    2021-07-31 22:00:24,810 ERROR server.ChunkFetchRequestHandler (ChunkFetchRequestHandler.java:lambda$respond$1(146)) - Error sending result ChunkFetchFailure[streamChunkId=StreamChunkId[streamId=1119950114515,chunkIndex=0],errorString=java.lang.NullPointerException
    	at org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:80)
    	at org.apache.spark.network.server.ChunkFetchRequestHandler.processFetchRequest(ChunkFetchRequestHandler.java:101)
    	at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:82)
    	at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:51)
    	at org.sparkproject.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
    	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
    	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370)
    	at org.sparkproject.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    	at org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
    	at org.sparkproject.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
    	at org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    	at org.sparkproject.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    	at org.sparkproject.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    	at java.lang.Thread.run(Thread.java:748)
    ] to /ip:53818; closing connection
    java.nio.channels.ClosedChannelException
    	at org.sparkproject.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
    	at org.sparkproject.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
    	at org.sparkproject.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
    	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
    	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
    	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
    	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
    	at org.sparkproject.io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:112)
    	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
    	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
    	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
    	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
    	at org.sparkproject.io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302)
    	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
    	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
    	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104)
    	at org.sparkproject.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    	at org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
    	at org.sparkproject.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
    	at org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    	at org.sparkproject.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    	at org.sparkproject.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    	at java.lang.Thread.run(Thread.java:748)
    ```
    
    Since JVM will not show stack of NPE exception if it happen many times.
    ```
    021-07-28 08:25:44,720 ERROR server.ChunkFetchRequestHandler (ChunkFetchRequestHandler.java:lambda$respond$1(146)) - Error sending result ChunkFetchFailure[streamChunkId=StreamChunkId[streamId=1187623335353,chunkIndex=11],errorString=java.lang.NullPoint
    erException
    ] to /10.130.10.5:42148; closing connection
    java.nio.channels.ClosedChannelException
    ```
    Makes user confused.
    We should improved this error message?
    
    ### Why are the changes needed?
    Improve error message
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    
    Closes #33622 from AngersZhuuuu/SPARK-36391.
    
    Lead-authored-by: Angerszhuuuu <an...@gmail.com>
    Co-authored-by: Angerszhuuuu <an...@gmail.com>
    Signed-off-by: yi.wu <yi...@databricks.com>
    (cherry picked from commit b377ea26e2f03e263f317caeabbf8ec866e890c3)
    Signed-off-by: yi.wu <yi...@databricks.com>
---
 .../java/org/apache/spark/network/server/OneForOneStreamManager.java | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
index 1a902a9..dfa31c0 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
@@ -76,7 +76,10 @@ public class OneForOneStreamManager extends StreamManager {
   @Override
   public ManagedBuffer getChunk(long streamId, int chunkIndex) {
     StreamState state = streams.get(streamId);
-    if (chunkIndex != state.curChunk) {
+    if (state == null) {
+      throw new IllegalStateException(String.format(
+        "Requested chunk not available since streamId %s is closed", streamId));
+    } else if (chunkIndex != state.curChunk) {
       throw new IllegalStateException(String.format(
         "Received out-of-order chunk index %s (expected %s)", chunkIndex, state.curChunk));
     } else if (!state.buffers.hasNext()) {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org