You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Saisai Shao (Jira)" <ji...@apache.org> on 2020/09/30 03:18:00 UTC

[jira] [Updated] (SPARK-13704) TaskSchedulerImpl.createTaskSetManager can be expensive, and result in lost executors due to blocked heartbeats

     [ https://issues.apache.org/jira/browse/SPARK-13704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Saisai Shao updated SPARK-13704:
--------------------------------
    Description: 
In some cases, TaskSchedulerImpl.createTaskSetManager can be expensive. For example, in a Yarn cluster, it may call the topology script for rack awareness. When submit a very large job in a very large Yarn cluster, the topology script may take signifiant time to run. And this blocks receiving executors' heartbeats, which may result in lost executors

Stacktraces we observed which is related to this issue:
{code}https://issues.apache.org/jira/browse/SPARK-13704#
"dag-scheduler-event-loop" daemon prio=10 tid=0x00007f8392875800 nid=0x26e8 runnable [0x00007f83576f4000]
   java.lang.Thread.State: RUNNABLE
        at java.io.FileInputStream.readBytes(Native Method)
        at java.io.FileInputStream.read(FileInputStream.java:272)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
        - locked <0x00000000f551f460> (a java.lang.UNIXProcess$ProcessPipeInputStream)
        at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283)
        at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325)
        at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177)
        - locked <0x00000000f5529740> (a java.io.InputStreamReader)
        at java.io.InputStreamReader.read(InputStreamReader.java:184)
        at java.io.BufferedReader.fill(BufferedReader.java:154)
        at java.io.BufferedReader.read1(BufferedReader.java:205)
        at java.io.BufferedReader.read(BufferedReader.java:279)
        - locked <0x00000000f5529740> (a java.io.InputStreamReader)
        at org.apache.hadoop.util.Shell$ShellCommandExecutor.parseExecResult(Shell.java:728)
        at org.apache.hadoop.util.Shell.runCommand(Shell.java:524)
        at org.apache.hadoop.util.Shell.run(Shell.java:455)
        at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
        at org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.runResolveCommand(ScriptBasedMapping.java:251)
        at org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.resolve(ScriptBasedMapping.java:188)
        at org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(CachedDNSToSwitchMapping.java:119)
        at org.apache.hadoop.yarn.util.RackResolver.coreResolve(RackResolver.java:101)
        at org.apache.hadoop.yarn.util.RackResolver.resolve(RackResolver.java:81)
        at org.apache.spark.scheduler.cluster.YarnScheduler.getRackForHost(YarnScheduler.scala:38)
        at org.apache.spark.scheduler.TaskSetManager$$anonfun$org$apache$spark$scheduler$TaskSetManager$$addPendingTask$1.apply(TaskSetManager.scala:210)
        at org.apache.spark.scheduler.TaskSetManager$$anonfun$org$apache$spark$scheduler$TaskSetManager$$addPendingTask$1.apply(TaskSetManager.scala:189)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.TaskSetManager.org$apache$spark$scheduler$TaskSetManager$$addPendingTask(TaskSetManager.scala:189)
        at org.apache.spark.scheduler.TaskSetManager$$anonfun$1.apply$mcVI$sp(TaskSetManager.scala:158)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
        at org.apache.spark.scheduler.TaskSetManager.<init>(TaskSetManager.scala:157)
        at org.apache.spark.scheduler.TaskSchedulerImpl.createTaskSetManager(TaskSchedulerImpl.scala:187)
        at org.apache.spark.scheduler.TaskSchedulerImpl.submitTasks(TaskSchedulerImpl.scala:161)
        - locked <0x00000000ea3b8a88> (a org.apache.spark.scheduler.cluster.YarnScheduler)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:872)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778)
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

"sparkDriver-akka.actor.default-dispatcher-15" daemon prio=10 tid=0x00007f829c020000 nid=0x2737 waiting for monitor entry [0x00007f8355ebd000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:362)
        - waiting to lock <0x00000000ea3b8a88> (a org.apache.spark.scheduler.cluster.YarnScheduler)
        at org.apache.spark.HeartbeatReceiver$$anonfun$receiveWithLogging$1.applyOrElse(HeartbeatReceiver.scala:46)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)
        at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}

  was:
In some cases, TaskSchedulerImpl.createTaskSetManager can be expensive. For example, in a Yarn cluster, it may call the topology script for rack awareness. When submit a very large job in a very large Yarn cluster, the topology script may take signifiant time to run. And this blocks receiving executors' heartbeats, which may result in lost executors

Stacktraces we observed which is related to this issue:
{code}
"dag-scheduler-event-loop" daemon prio=10 tid=0x00007f8392875800 nid=0x26e8 runnable [0x00007f83576f4000]
   java.lang.Thread.State: RUNNABLE
        at java.io.FileInputStream.readBytes(Native Method)
        at java.io.FileInputStream.read(FileInputStream.java:272)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
        - locked <0x00000000f551f460> (a java.lang.UNIXProcess$ProcessPipeInputStream)
        at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283)
        at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325)
        at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177)
        - locked <0x00000000f5529740> (a java.io.InputStreamReader)
        at java.io.InputStreamReader.read(InputStreamReader.java:184)
        at java.io.BufferedReader.fill(BufferedReader.java:154)
        at java.io.BufferedReader.read1(BufferedReader.java:205)
        at java.io.BufferedReader.read(BufferedReader.java:279)
        - locked <0x00000000f5529740> (a java.io.InputStreamReader)
        at org.apache.hadoop.util.Shell$ShellCommandExecutor.parseExecResult(Shell.java:728)
        at org.apache.hadoop.util.Shell.runCommand(Shell.java:524)
        at org.apache.hadoop.util.Shell.run(Shell.java:455)
        at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
        at org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.runResolveCommand(ScriptBasedMapping.java:251)
        at org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.resolve(ScriptBasedMapping.java:188)
        at org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(CachedDNSToSwitchMapping.java:119)
        at org.apache.hadoop.yarn.util.RackResolver.coreResolve(RackResolver.java:101)
        at org.apache.hadoop.yarn.util.RackResolver.resolve(RackResolver.java:81)
        at org.apache.spark.scheduler.cluster.YarnScheduler.getRackForHost(YarnScheduler.scala:38)
        at org.apache.spark.scheduler.TaskSetManager$$anonfun$org$apache$spark$scheduler$TaskSetManager$$addPendingTask$1.apply(TaskSetManager.scala:210)
        at org.apache.spark.scheduler.TaskSetManager$$anonfun$org$apache$spark$scheduler$TaskSetManager$$addPendingTask$1.apply(TaskSetManager.scala:189)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.TaskSetManager.org$apache$spark$scheduler$TaskSetManager$$addPendingTask(TaskSetManager.scala:189)
        at org.apache.spark.scheduler.TaskSetManager$$anonfun$1.apply$mcVI$sp(TaskSetManager.scala:158)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
        at org.apache.spark.scheduler.TaskSetManager.<init>(TaskSetManager.scala:157)
        at org.apache.spark.scheduler.TaskSchedulerImpl.createTaskSetManager(TaskSchedulerImpl.scala:187)
        at org.apache.spark.scheduler.TaskSchedulerImpl.submitTasks(TaskSchedulerImpl.scala:161)
        - locked <0x00000000ea3b8a88> (a org.apache.spark.scheduler.cluster.YarnScheduler)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:872)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778)
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

"sparkDriver-akka.actor.default-dispatcher-15" daemon prio=10 tid=0x00007f829c020000 nid=0x2737 waiting for monitor entry [0x00007f8355ebd000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:362)
        - waiting to lock <0x00000000ea3b8a88> (a org.apache.spark.scheduler.cluster.YarnScheduler)
        at org.apache.spark.HeartbeatReceiver$$anonfun$receiveWithLogging$1.applyOrElse(HeartbeatReceiver.scala:46)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)
        at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}


> TaskSchedulerImpl.createTaskSetManager can be expensive, and result in lost executors due to blocked heartbeats
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-13704
>                 URL: https://issues.apache.org/jira/browse/SPARK-13704
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 1.3.1, 1.4.1, 1.5.2, 1.6.0
>            Reporter: Zhong Wang
>            Assignee: Lantao Jin
>            Priority: Major
>             Fix For: 3.0.0
>
>
> In some cases, TaskSchedulerImpl.createTaskSetManager can be expensive. For example, in a Yarn cluster, it may call the topology script for rack awareness. When submit a very large job in a very large Yarn cluster, the topology script may take signifiant time to run. And this blocks receiving executors' heartbeats, which may result in lost executors
> Stacktraces we observed which is related to this issue:
> {code}https://issues.apache.org/jira/browse/SPARK-13704#
> "dag-scheduler-event-loop" daemon prio=10 tid=0x00007f8392875800 nid=0x26e8 runnable [0x00007f83576f4000]
>    java.lang.Thread.State: RUNNABLE
>         at java.io.FileInputStream.readBytes(Native Method)
>         at java.io.FileInputStream.read(FileInputStream.java:272)
>         at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>         at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>         - locked <0x00000000f551f460> (a java.lang.UNIXProcess$ProcessPipeInputStream)
>         at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283)
>         at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325)
>         at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177)
>         - locked <0x00000000f5529740> (a java.io.InputStreamReader)
>         at java.io.InputStreamReader.read(InputStreamReader.java:184)
>         at java.io.BufferedReader.fill(BufferedReader.java:154)
>         at java.io.BufferedReader.read1(BufferedReader.java:205)
>         at java.io.BufferedReader.read(BufferedReader.java:279)
>         - locked <0x00000000f5529740> (a java.io.InputStreamReader)
>         at org.apache.hadoop.util.Shell$ShellCommandExecutor.parseExecResult(Shell.java:728)
>         at org.apache.hadoop.util.Shell.runCommand(Shell.java:524)
>         at org.apache.hadoop.util.Shell.run(Shell.java:455)
>         at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
>         at org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.runResolveCommand(ScriptBasedMapping.java:251)
>         at org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.resolve(ScriptBasedMapping.java:188)
>         at org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(CachedDNSToSwitchMapping.java:119)
>         at org.apache.hadoop.yarn.util.RackResolver.coreResolve(RackResolver.java:101)
>         at org.apache.hadoop.yarn.util.RackResolver.resolve(RackResolver.java:81)
>         at org.apache.spark.scheduler.cluster.YarnScheduler.getRackForHost(YarnScheduler.scala:38)
>         at org.apache.spark.scheduler.TaskSetManager$$anonfun$org$apache$spark$scheduler$TaskSetManager$$addPendingTask$1.apply(TaskSetManager.scala:210)
>         at org.apache.spark.scheduler.TaskSetManager$$anonfun$org$apache$spark$scheduler$TaskSetManager$$addPendingTask$1.apply(TaskSetManager.scala:189)
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at org.apache.spark.scheduler.TaskSetManager.org$apache$spark$scheduler$TaskSetManager$$addPendingTask(TaskSetManager.scala:189)
>         at org.apache.spark.scheduler.TaskSetManager$$anonfun$1.apply$mcVI$sp(TaskSetManager.scala:158)
>         at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>         at org.apache.spark.scheduler.TaskSetManager.<init>(TaskSetManager.scala:157)
>         at org.apache.spark.scheduler.TaskSchedulerImpl.createTaskSetManager(TaskSchedulerImpl.scala:187)
>         at org.apache.spark.scheduler.TaskSchedulerImpl.submitTasks(TaskSchedulerImpl.scala:161)
>         - locked <0x00000000ea3b8a88> (a org.apache.spark.scheduler.cluster.YarnScheduler)
>         at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:872)
>         at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778)
>         at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762)
>         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
>         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> "sparkDriver-akka.actor.default-dispatcher-15" daemon prio=10 tid=0x00007f829c020000 nid=0x2737 waiting for monitor entry [0x00007f8355ebd000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>         at org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:362)
>         - waiting to lock <0x00000000ea3b8a88> (a org.apache.spark.scheduler.cluster.YarnScheduler)
>         at org.apache.spark.HeartbeatReceiver$$anonfun$receiveWithLogging$1.applyOrElse(HeartbeatReceiver.scala:46)
>         at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>         at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>         at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>         at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)
>         at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
>         at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>         at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>         at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org