You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/02/05 05:16:15 UTC

[spark] branch master updated: [SPARK-42343][CORE] Ignore `IOException` in `handleBlockRemovalFailure` if SparkContext is stopped

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 67285c3461f [SPARK-42343][CORE] Ignore `IOException` in `handleBlockRemovalFailure` if SparkContext is stopped
67285c3461f is described below

commit 67285c3461fe9e2ecb3a00e930115465ffc080ca
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Sat Feb 4 21:16:01 2023 -0800

    [SPARK-42343][CORE] Ignore `IOException` in `handleBlockRemovalFailure` if SparkContext is stopped
    
    ### What changes were proposed in this pull request?
    
    This PR aims to suppress verbose `IOException` warnings in `BlockManagerMasterEndpoint.handleBlockRemovalFailure` in case of the stopped `SparkContext`.
    
    ### Why are the changes needed?
    
    Although we ignore these kind of exceptions during `SparkContext.stop`, this PR can prevent the misleading error messages like the following.
    ```
    23/02/04 01:26:05 INFO SparkUI: Stopped Spark web UI at http://driver-svc.default.svc:4040
    23/02/04 01:26:05 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
    23/02/04 01:26:05 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
    23/02/04 01:26:05 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
    23/02/04 01:26:05 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /172.31.215.139:47148 is closed
    23/02/04 01:26:05 WARN BlockManagerMasterEndpoint: Error trying to remove shuffle 0 from block manager BlockManagerId(3, 172.31.215.139, 37477, None)
    java.io.IOException: Connection from /172.31.215.139:47148 closed
            at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:147)
            at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
            at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
            at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
            at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
            at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:225)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
            at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
            at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
            at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:831)
            at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
            at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
            at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
            at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
            at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
            at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
            at java.base/java.lang.Thread.run(Thread.java:829)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, but this PR changes `log` ond and this happens during `SparkContext.stop` and this
    
    ### How was this patch tested?
    
    Manual review.
    
    Closes #39883 from dongjoon-hyun/SPARK-42343.
    
    Authored-by: Dongjoon Hyun <do...@apache.org>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../org/apache/spark/storage/BlockManagerMasterEndpoint.scala     | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 681a812e880..47cab187ed8 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -29,7 +29,7 @@ import scala.util.control.NonFatal
 
 import com.google.common.cache.CacheBuilder
 
-import org.apache.spark.{MapOutputTrackerMaster, SparkConf}
+import org.apache.spark.{MapOutputTrackerMaster, SparkConf, SparkContext}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.network.shuffle.{ExternalBlockStoreClient, RemoteBlockPushResolver}
@@ -231,8 +231,10 @@ class BlockManagerMasterEndpoint(
       bmId: BlockManagerId,
       defaultValue: T): PartialFunction[Throwable, T] = {
     case e: IOException =>
-      logWarning(s"Error trying to remove $blockType $blockId" +
-        s" from block manager $bmId", e)
+      if (!SparkContext.getActive.map(_.isStopped).getOrElse(true)) {
+        logWarning(s"Error trying to remove $blockType $blockId" +
+          s" from block manager $bmId", e)
+      }
       defaultValue
 
     case t: TimeoutException =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org