You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Chowdary Davuluri (Jira)" <ji...@apache.org> on 2021/09/20 22:21:00 UTC

[jira] [Updated] (SPARK-36812) Spark on K8s without External Shuffle Service: Improve job failure details on missing shuffle data when blacklisting is enabled

     [ https://issues.apache.org/jira/browse/SPARK-36812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chowdary Davuluri updated SPARK-36812:
--------------------------------------
    Description: 
This behavior is noticed with Spark (all versions) on K8s when executor blacklisting is enabled and no External Shuffle Service is used.

Currently, when a stage is aborted, the failure reason displayed in the driver logs (and surfaced in the event logs) only includes the failure details of the most recently failed task. When executor blacklisting is enabled, the most recent task failure is always likely to be an instance of org.apache.spark.shuffle.MetadataFetchFailedException. Enhancing this error to include the failure details of the most recent failed task which resulted in an executor getting blacklisted, and resulted in the subsequent shuffle fetch failure, will benefit the users.

 

Code to reproduce the issue:
{code:java}
object TestApp {
  def main (arg: Array[String]): Unit = {
    val jobName = "TestApp"
    val logger = LogManager.getLogger("TestApp")
    try {
      logger.info(s"Starting execution..")
      val maxCount = 100
      val numbers  = (1 to maxCount).zipWithIndex.map(_.swap)      
      val context = SparkSession.builder().appName(jobName).getOrCreate().sparkContext;      
      context
        .parallelize(numbers, maxCount)
        .partitionBy(new Partitioner {
          override def numPartitions: Int          = maxCount
          override def getPartition(key: Any): Int = key.asInstanceOf[Int]
        })
        .map {
          case (_, number) =>
            if (true) sys.error("Something bad has happened")
            else number
        }
        .count()    } catch {
      case e: Exception => {
        logger.error(s"$jobName error in main", e)
      }
    }
  }
}
{code}
Config used:
{code:java}
"spark.executor.memory": "2G",
"spark.blacklist.killBlacklistedExecutors": "true",
"spark.blacklist.enabled": "true",
"spark.blacklist.application.maxFailedTasksPerExecutor": "1",
"spark.blacklist.timeout": "10800s",
"spark.blacklist.task.maxTaskAttemptsPerNode": "3",
"spark.blacklist.task.maxTaskAttemptsPerExecutor": "1",
"spark.blacklist.stage.maxFailedTasksPerExecutor": "1",
"spark.blacklist.stage.maxFailedExecutorsPerNode": "3",
"spark.blacklist.decommissioning.timeout": "1h",
"spark.blacklist.decommissioning.enabled": "true",
"spark.executor.instances": "3",
"spark.blacklist.application.maxFailedExecutorsPerNode": "3"
{code}
Error message in the driver log:
{noformat}
21/09/09 18:52:28 INFO DAGScheduler: Job 0 failed: count at TestApp.scala:30, took 35.360700 s
21/09/09 18:52:28 ERROR TestApp: TestApp error in main
org.apache.spark.SparkException: Job aborted due to stage failure: ResultStage 1 (count at TestApp.scala:30) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0   at org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$4.apply(MapOutputTracker.scala:972)    at org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$4.apply(MapOutputTracker.scala:968)    at scala.collection.Iterator$class.foreach(Iterator.scala:891)  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)       at org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:968)    at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:759)  at org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:118)     at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:104)      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)      at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)      at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)   at org.apache.spark.scheduler.Task.run(Task.scala:123)  at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)      at java.lang.Thread.run(Thread.java:748)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2080)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2068)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2067)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2067)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1568)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2298)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2250)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2239)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:799)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
        at org.apache.spark.rdd.RDD.count(RDD.scala:1213)
        at org.example.TestApp$.main(TestApp.scala:30)
        at org.example.TestApp.main(TestApp.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
{noformat}

  was:
This behavior is noticed with Spark (all versions) on K8s when executor blacklisting is enabled and no External Shuffle Service is used.

Currently, when a stage is aborted, the failure reason displayed in the driver logs (and surfaced in the event logs) only includes the failure details of the most recently failed task. When executor blacklisting is enabled, the most recent task failure is always likely to be an instance of org.apache.spark.shuffle.MetadataFetchFailedException. Enhancing this error to include the failure details of the most recent failed task which resulted in an executor getting blacklisted, and resulted in the subsequent shuffle fetch failure, will benefit the users.

 

Code to reproduce the issue:
{code:java}
object TestApp {
  def main (arg: Array[String]): Unit = {
    val jobName = "TestApp"
    val logger = LogManager.getLogger("TestApp")
    try {
      logger.info(s"Starting execution..")
      val maxCount = 100
      val numbers  = (1 to maxCount).zipWithIndex.map(_.swap)      
      val context = SparkSession.builder().appName(jobName).getOrCreate().sparkContext;      
      context
        .parallelize(numbers, maxCount)
        .partitionBy(new Partitioner {
          override def numPartitions: Int          = maxCount
          override def getPartition(key: Any): Int = key.asInstanceOf[Int]
        })
        .map {
          case (_, number) =>
            if (true) sys.error("Something bad has happened")
            else number
        }
        .count()    } catch {
      case e: Exception => {
        logger.error(s"$jobName error in main", e)
      }
    }
  }
}
{code}
Config used:
{code:java}
"spark.executor.memory": "2G",
"spark.blacklist.killBlacklistedExecutors": "true",
"spark.blacklist.enabled": "true",
"spark.blacklist.application.maxFailedTasksPerExecutor": "1",
"spark.blacklist.timeout": "10800s",
"spark.blacklist.task.maxTaskAttemptsPerNode": "3",
"spark.blacklist.task.maxTaskAttemptsPerExecutor": "1",
"spark.blacklist.stage.maxFailedTasksPerExecutor": "1",
"spark.blacklist.stage.maxFailedExecutorsPerNode": "3",
"spark.blacklist.decommissioning.timeout": "1h",
"spark.blacklist.decommissioning.enabled": "true",
"spark.executor.instances": "3",
"spark.blacklist.application.maxFailedExecutorsPerNode": "3"
{code}


> Spark on K8s without External Shuffle Service: Improve job failure details on missing shuffle data when blacklisting is enabled
> -------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-36812
>                 URL: https://issues.apache.org/jira/browse/SPARK-36812
>             Project: Spark
>          Issue Type: Improvement
>          Components: Kubernetes, Spark Core
>    Affects Versions: 2.4.7
>            Reporter: Chowdary Davuluri
>            Priority: Major
>
> This behavior is noticed with Spark (all versions) on K8s when executor blacklisting is enabled and no External Shuffle Service is used.
> Currently, when a stage is aborted, the failure reason displayed in the driver logs (and surfaced in the event logs) only includes the failure details of the most recently failed task. When executor blacklisting is enabled, the most recent task failure is always likely to be an instance of org.apache.spark.shuffle.MetadataFetchFailedException. Enhancing this error to include the failure details of the most recent failed task which resulted in an executor getting blacklisted, and resulted in the subsequent shuffle fetch failure, will benefit the users.
>  
> Code to reproduce the issue:
> {code:java}
> object TestApp {
>   def main (arg: Array[String]): Unit = {
>     val jobName = "TestApp"
>     val logger = LogManager.getLogger("TestApp")
>     try {
>       logger.info(s"Starting execution..")
>       val maxCount = 100
>       val numbers  = (1 to maxCount).zipWithIndex.map(_.swap)      
>       val context = SparkSession.builder().appName(jobName).getOrCreate().sparkContext;      
>       context
>         .parallelize(numbers, maxCount)
>         .partitionBy(new Partitioner {
>           override def numPartitions: Int          = maxCount
>           override def getPartition(key: Any): Int = key.asInstanceOf[Int]
>         })
>         .map {
>           case (_, number) =>
>             if (true) sys.error("Something bad has happened")
>             else number
>         }
>         .count()    } catch {
>       case e: Exception => {
>         logger.error(s"$jobName error in main", e)
>       }
>     }
>   }
> }
> {code}
> Config used:
> {code:java}
> "spark.executor.memory": "2G",
> "spark.blacklist.killBlacklistedExecutors": "true",
> "spark.blacklist.enabled": "true",
> "spark.blacklist.application.maxFailedTasksPerExecutor": "1",
> "spark.blacklist.timeout": "10800s",
> "spark.blacklist.task.maxTaskAttemptsPerNode": "3",
> "spark.blacklist.task.maxTaskAttemptsPerExecutor": "1",
> "spark.blacklist.stage.maxFailedTasksPerExecutor": "1",
> "spark.blacklist.stage.maxFailedExecutorsPerNode": "3",
> "spark.blacklist.decommissioning.timeout": "1h",
> "spark.blacklist.decommissioning.enabled": "true",
> "spark.executor.instances": "3",
> "spark.blacklist.application.maxFailedExecutorsPerNode": "3"
> {code}
> Error message in the driver log:
> {noformat}
> 21/09/09 18:52:28 INFO DAGScheduler: Job 0 failed: count at TestApp.scala:30, took 35.360700 s
> 21/09/09 18:52:28 ERROR TestApp: TestApp error in main
> org.apache.spark.SparkException: Job aborted due to stage failure: ResultStage 1 (count at TestApp.scala:30) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0   at org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$4.apply(MapOutputTracker.scala:972)    at org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$4.apply(MapOutputTracker.scala:968)    at scala.collection.Iterator$class.foreach(Iterator.scala:891)  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)       at org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:968)    at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:759)  at org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:118)     at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:104)      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)      at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)      at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)   at org.apache.spark.scheduler.Task.run(Task.scala:123)  at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)      at java.lang.Thread.run(Thread.java:748)
>         at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2080)
>         at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2068)
>         at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2067)
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2067)
>         at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1568)
>         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2298)
>         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2250)
>         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2239)
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>         at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:799)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
>         at org.apache.spark.rdd.RDD.count(RDD.scala:1213)
>         at org.example.TestApp$.main(TestApp.scala:30)
>         at org.example.TestApp.main(TestApp.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
>         at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
>         at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
>         at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
>         at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
>         at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org