You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/02/05 05:16:15 UTC
[spark] branch master updated: [SPARK-42343][CORE] Ignore `IOException` in `handleBlockRemovalFailure` if SparkContext is stopped
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 67285c3461f [SPARK-42343][CORE] Ignore `IOException` in `handleBlockRemovalFailure` if SparkContext is stopped
67285c3461f is described below
commit 67285c3461fe9e2ecb3a00e930115465ffc080ca
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Sat Feb 4 21:16:01 2023 -0800
[SPARK-42343][CORE] Ignore `IOException` in `handleBlockRemovalFailure` if SparkContext is stopped
### What changes were proposed in this pull request?
This PR aims to suppress verbose `IOException` warnings in `BlockManagerMasterEndpoint.handleBlockRemovalFailure` in case of the stopped `SparkContext`.
### Why are the changes needed?
Although we ignore these kind of exceptions during `SparkContext.stop`, this PR can prevent the misleading error messages like the following.
```
23/02/04 01:26:05 INFO SparkUI: Stopped Spark web UI at http://driver-svc.default.svc:4040
23/02/04 01:26:05 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
23/02/04 01:26:05 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
23/02/04 01:26:05 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
23/02/04 01:26:05 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /172.31.215.139:47148 is closed
23/02/04 01:26:05 WARN BlockManagerMasterEndpoint: Error trying to remove shuffle 0 from block manager BlockManagerId(3, 172.31.215.139, 37477, None)
java.io.IOException: Connection from /172.31.215.139:47148 closed
at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:147)
at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:225)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:831)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
```
### Does this PR introduce _any_ user-facing change?
Yes, but this PR changes `log` ond and this happens during `SparkContext.stop` and this
### How was this patch tested?
Manual review.
Closes #39883 from dongjoon-hyun/SPARK-42343.
Authored-by: Dongjoon Hyun <do...@apache.org>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../org/apache/spark/storage/BlockManagerMasterEndpoint.scala | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 681a812e880..47cab187ed8 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -29,7 +29,7 @@ import scala.util.control.NonFatal
import com.google.common.cache.CacheBuilder
-import org.apache.spark.{MapOutputTrackerMaster, SparkConf}
+import org.apache.spark.{MapOutputTrackerMaster, SparkConf, SparkContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.network.shuffle.{ExternalBlockStoreClient, RemoteBlockPushResolver}
@@ -231,8 +231,10 @@ class BlockManagerMasterEndpoint(
bmId: BlockManagerId,
defaultValue: T): PartialFunction[Throwable, T] = {
case e: IOException =>
- logWarning(s"Error trying to remove $blockType $blockId" +
- s" from block manager $bmId", e)
+ if (!SparkContext.getActive.map(_.isStopped).getOrElse(true)) {
+ logWarning(s"Error trying to remove $blockType $blockId" +
+ s" from block manager $bmId", e)
+ }
defaultValue
case t: TimeoutException =>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org