You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2022/01/11 05:05:21 UTC
[spark] branch master updated: [SPARK-37847][CORE][SHUFFLE] PushBlockStreamCallback#isStale should check null to avoid NPE
This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7463564 [SPARK-37847][CORE][SHUFFLE] PushBlockStreamCallback#isStale should check null to avoid NPE
7463564 is described below
commit 74635649b230fd76199efe3da4bd3e4112894c4a
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Mon Jan 10 23:04:39 2022 -0600
[SPARK-37847][CORE][SHUFFLE] PushBlockStreamCallback#isStale should check null to avoid NPE
### What changes were proposed in this pull request?
Check `null` in `isStale` to avoid NPE.
### Why are the changes needed?
There is a chance that late push shuffle block request invokes `PushBlockStreamCallback#onData` after the merged partition finalized, which causes NPE.
```
2022-01-07 21:06:14,464 INFO shuffle.RemoteBlockPushResolver: shuffle partition application_1640143179334_0149_-1 102 6922, chunk_size=1, meta_length=138, data_length=112632
2022-01-07 21:06:14,615 ERROR shuffle.RemoteBlockPushResolver: Encountered issue when merging shufflePush_102_0_279_6922
java.lang.NullPointerException
at org.apache.spark.network.shuffle.RemoteBlockPushResolver$AppShuffleMergePartitionsInfo.access$200(RemoteBlockPushResolver.java:1017)
at org.apache.spark.network.shuffle.RemoteBlockPushResolver$PushBlockStreamCallback.isStale(RemoteBlockPushResolver.java:806)
at org.apache.spark.network.shuffle.RemoteBlockPushResolver$PushBlockStreamCallback.onData(RemoteBlockPushResolver.java:840)
at org.apache.spark.network.server.TransportRequestHandler$3.onData(TransportRequestHandler.java:209)
at org.apache.spark.network.client.StreamInterceptor.handle(StreamInterceptor.java:79)
at org.apache.spark.network.util.TransportFrameDecoder.feedInterceptor(TransportFrameDecoder.java:263)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:87)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at org.sparkproject.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.sparkproject.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at org.sparkproject.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at org.sparkproject.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
at org.sparkproject.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
at org.sparkproject.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
at org.sparkproject.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
at org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at org.sparkproject.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at org.sparkproject.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
```
`isTooLate` checks null but `isStale` does not, so check `isTooLate` first to avoid NPE
```java
private boolean isTooLate(
AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo,
int reduceId) {
return null == appShuffleMergePartitionsInfo ||
INDETERMINATE_SHUFFLE_FINALIZED == appShuffleMergePartitionsInfo.shuffleMergePartitions ||
!appShuffleMergePartitionsInfo.shuffleMergePartitions.containsKey(reduceId);
}
```
### Does this PR introduce _any_ user-facing change?
Bugfix, to avoid NPE in Yarn ESS.
### How was this patch tested?
I don't think it's easy to write a unit test for this issue based on current code, since it's a minor change, use exsiting ut to ensue the change doesn't break the current functionalities.
Closes #35146 from pan3793/SPARK-37847.
Authored-by: Cheng Pan <ch...@apache.org>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../org/apache/spark/network/shuffle/RemoteBlockPushResolver.java | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index d0eb4ae..d626cc3 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -789,13 +789,14 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
}
/**
- * If appShuffleMergePartitionsInfo's shuffleMergeId is
+ * If appShuffleMergePartitionsInfo is null or shuffleMergeId is
* greater than the request shuffleMergeId then it is a stale block push.
*/
private boolean isStale(
AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo,
int shuffleMergeId) {
- return appShuffleMergePartitionsInfo.shuffleMergeId > shuffleMergeId;
+ return null == appShuffleMergePartitionsInfo ||
+ appShuffleMergePartitionsInfo.shuffleMergeId > shuffleMergeId;
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org