You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Andrew Ash <an...@andrewash.com> on 2014/02/06 11:20:56 UTC

[0.9.0] Possible deadlock in shutdown hook?

Hi Spark devs,

Occasionally when hitting Ctrl-C in the scala spark shell on 0.9.0 one of
my workers goes dead in the spark master UI.  I'm using the standalone
cluster and didn't ever see this while using 0.8.0 so I think it may be a
regression.

When I prod on the hung CoarseGrainedExecutorBackend JVM with jstack and
jmap -heap, it doesn't respond unless I add the -F force flag.  The heap
isn't full, but there are some interesting bits in the jstack.  Poking
around a little, I think there may be some kind of deadlock in the shutdown
hooks.

Below are the threads I think are most interesting:

Thread 14308: (state = BLOCKED)
 - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted frame)
 - java.lang.Runtime.exit(int) @bci=14, line=109 (Interpreted frame)
 - java.lang.System.exit(int) @bci=4, line=962 (Interpreted frame)
 -
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object,
scala.Function1) @bci=352, line=81 (Interpreted frame)
 - akka.actor.ActorCell.receiveMessage(java.lang.Object) @bci=25, line=498
(Interpreted frame)
 - akka.actor.ActorCell.invoke(akka.dispatch.Envelope) @bci=39, line=456
(Interpreted frame)
 - akka.dispatch.Mailbox.processMailbox(int, long) @bci=24, line=237
(Interpreted frame)
 - akka.dispatch.Mailbox.run() @bci=20, line=219 (Interpreted frame)
 - akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec()
@bci=4, line=386 (Interpreted frame)
 - scala.concurrent.forkjoin.ForkJoinTask.doExec() @bci=10, line=260
(Compiled frame)
 -
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask)
@bci=10, line=1339 (Compiled frame)
 -
scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue)
@bci=11, line=1979 (Compiled frame)
 - scala.concurrent.forkjoin.ForkJoinWorkerThread.run() @bci=14, line=107
(Interpreted frame)

Thread 3865: (state = BLOCKED)
 - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
 - java.lang.Thread.join(long) @bci=38, line=1280 (Interpreted frame)
 - java.lang.Thread.join() @bci=2, line=1354 (Interpreted frame)
 - java.lang.ApplicationShutdownHooks.runHooks() @bci=87, line=106
(Interpreted frame)
 - java.lang.ApplicationShutdownHooks$1.run() @bci=0, line=46 (Interpreted
frame)
 - java.lang.Shutdown.runHooks() @bci=39, line=123 (Interpreted frame)
 - java.lang.Shutdown.sequence() @bci=26, line=167 (Interpreted frame)
 - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted frame)
 - java.lang.Terminator$1.handle(sun.misc.Signal) @bci=8, line=52
(Interpreted frame)
 - sun.misc.Signal$1.run() @bci=8, line=212 (Interpreted frame)
 - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)


Thread 3987: (state = BLOCKED)
 - java.io.UnixFileSystem.list(java.io.File) @bci=0 (Interpreted frame)
 - java.io.File.list() @bci=29, line=1116 (Interpreted frame)
 - java.io.File.listFiles() @bci=1, line=1201 (Compiled frame)
 - org.apache.spark.util.Utils$.listFilesSafely(java.io.File) @bci=1,
line=466 (Interpreted frame)
 - org.apache.spark.util.Utils$.deleteRecursively(java.io.File) @bci=9,
line=478 (Compiled frame)
 -
org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File)
@bci=4, line=479 (Compiled frame)
 -
org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object)
@bci=5, line=478 (Compiled frame)
 -
scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
scala.Function1) @bci=22, line=33 (Compiled frame)
 - scala.collection.mutable.WrappedArray.foreach(scala.Function1) @bci=2,
line=34 (Compiled frame)
 - org.apache.spark.util.Utils$.deleteRecursively(java.io.File) @bci=19,
line=478 (Interpreted frame)
 -
org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File)
@bci=14, line=141 (Interpreted frame)
 -
org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object)
@bci=5, line=139 (Interpreted frame)
 -
scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
scala.Function1) @bci=22, line=33 (Compiled frame)
 - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1) @bci=2,
line=108 (Interpreted frame)
 - org.apache.spark.storage.DiskBlockManager$$anon$1.run() @bci=39,
line=139 (Interpreted frame)


I think what happened here is that thread 14308 received the akka
"shutdown" message and called System.exit().  This started thread 3865,
which is the JVM shutting itself down.  Part of that process is running the
shutdown hooks, so it started thread 3987.  That thread is the shutdown
hook from addShutdownHook() in DiskBlockManager.scala, which looks like
this:

  private def addShutdownHook() {
    localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir))
    Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local
dirs") {
      override def run() {
        logDebug("Shutdown hook called")
        localDirs.foreach { localDir =>
          try {
            if (!Utils.hasRootAsShutdownDeleteDir(localDir))
Utils.deleteRecursively(localDir)
          } catch {
            case t: Throwable =>
              logError("Exception while deleting local spark dir: " +
localDir, t)
          }
        }

        if (shuffleSender != null) {
          shuffleSender.stop()
        }
      }
    })
  }

It goes through and deletes the directories recursively.  I was thinking
there might be some issues with concurrently-running shutdown hooks
deleting things out from underneath each other (shutdown hook javadocs say
they're all started in parallel if multiple hooks are added) causing the
File.list() in that last thread to take quite some time.

While I was looking through the stacktrace the JVM finally exited (after
15-20min at least) so I won't be able to debug more until this bug strikes
again.

Any ideas on what might be going on here?

Thanks!
Andrew

Re: [0.9.0] Possible deadlock in shutdown hook?

Posted by Reynold Xin <rx...@databricks.com>.
Is it safe if we interrupt the running thread during shutdown?




On Thu, Feb 6, 2014 at 3:27 AM, Andrew Ash <an...@andrewash.com> wrote:

> Per the book Java Concurrency in Practice the already-running threads
> continue running while the shutdown hooks run.  So I think the race between
> the writing thread and the deleting thread could be a very real possibility
> :/
>
> http://stackoverflow.com/a/3332925/120915
>
>
> On Thu, Feb 6, 2014 at 2:49 AM, Andrew Ash <an...@andrewash.com> wrote:
>
> > Got a repro locally on my MBP (the other was on a CentOS machine).
> >
> > Build spark, run a master and a worker with the sbin/start-all.sh script,
> > then run this in a shell:
> >
> > import org.apache.spark.storage.StorageLevel._
> > val s = sc.parallelize(1 to 1000000000).persist(MEMORY_AND_DISK_SER);
> > s.count
> >
> > After about a minute, this line appears in the shell logging output:
> >
> > 14/02/06 02:44:44 WARN BlockManagerMasterActor: Removing BlockManager
> > BlockManagerId(0, aash-mbp.dyn.yojoe.local, 57895, 0) with no recent
> heart
> > beats: 57510ms exceeds 45000ms
> >
> > Ctrl-C the shell.  In jps there is now a worker, a master, and a
> > CoarseGrainedExecutorBackend.
> >
> > Run jstack on the CGEBackend JVM, and I got the attached stacktraces.  I
> > waited around for 15min then kill -9'd the JVM and restarted the process.
> >
> > I wonder if what's happening here is that the threads that are spewing
> > data to disk (as that parallelize and persist would do) can write to disk
> > faster than the cleanup threads can delete from disk.
> >
> > What do you think of that theory?
> >
> >
> > Andrew
> >
> >
> >
> > On Thu, Feb 6, 2014 at 2:30 AM, Mridul Muralidharan <mridul@gmail.com
> >wrote:
> >
> >> shutdown hooks should not take 15 mins are you mentioned !
> >> On the other hand, how busy was your disk when this was happening ?
> >> (either due to spark or something else ?)
> >>
> >> It might just be that there was a lot of stuff to remove ?
> >>
> >> Regards,
> >> Mridul
> >>
> >>
> >> On Thu, Feb 6, 2014 at 3:50 PM, Andrew Ash <an...@andrewash.com>
> wrote:
> >> > Hi Spark devs,
> >> >
> >> > Occasionally when hitting Ctrl-C in the scala spark shell on 0.9.0 one
> >> of
> >> > my workers goes dead in the spark master UI.  I'm using the standalone
> >> > cluster and didn't ever see this while using 0.8.0 so I think it may
> be
> >> a
> >> > regression.
> >> >
> >> > When I prod on the hung CoarseGrainedExecutorBackend JVM with jstack
> and
> >> > jmap -heap, it doesn't respond unless I add the -F force flag.  The
> heap
> >> > isn't full, but there are some interesting bits in the jstack.  Poking
> >> > around a little, I think there may be some kind of deadlock in the
> >> shutdown
> >> > hooks.
> >> >
> >> > Below are the threads I think are most interesting:
> >> >
> >> > Thread 14308: (state = BLOCKED)
> >> >  - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted frame)
> >> >  - java.lang.Runtime.exit(int) @bci=14, line=109 (Interpreted frame)
> >> >  - java.lang.System.exit(int) @bci=4, line=962 (Interpreted frame)
> >> >  -
> >> >
> >>
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object,
> >> > scala.Function1) @bci=352, line=81 (Interpreted frame)
> >> >  - akka.actor.ActorCell.receiveMessage(java.lang.Object) @bci=25,
> >> line=498
> >> > (Interpreted frame)
> >> >  - akka.actor.ActorCell.invoke(akka.dispatch.Envelope) @bci=39,
> line=456
> >> > (Interpreted frame)
> >> >  - akka.dispatch.Mailbox.processMailbox(int, long) @bci=24, line=237
> >> > (Interpreted frame)
> >> >  - akka.dispatch.Mailbox.run() @bci=20, line=219 (Interpreted frame)
> >> >  - akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec()
> >> > @bci=4, line=386 (Interpreted frame)
> >> >  - scala.concurrent.forkjoin.ForkJoinTask.doExec() @bci=10, line=260
> >> > (Compiled frame)
> >> >  -
> >> >
> >>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask)
> >> > @bci=10, line=1339 (Compiled frame)
> >> >  -
> >> >
> >>
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue)
> >> > @bci=11, line=1979 (Compiled frame)
> >> >  - scala.concurrent.forkjoin.ForkJoinWorkerThread.run() @bci=14,
> >> line=107
> >> > (Interpreted frame)
> >> >
> >> > Thread 3865: (state = BLOCKED)
> >> >  - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
> >> >  - java.lang.Thread.join(long) @bci=38, line=1280 (Interpreted frame)
> >> >  - java.lang.Thread.join() @bci=2, line=1354 (Interpreted frame)
> >> >  - java.lang.ApplicationShutdownHooks.runHooks() @bci=87, line=106
> >> > (Interpreted frame)
> >> >  - java.lang.ApplicationShutdownHooks$1.run() @bci=0, line=46
> >> (Interpreted
> >> > frame)
> >> >  - java.lang.Shutdown.runHooks() @bci=39, line=123 (Interpreted frame)
> >> >  - java.lang.Shutdown.sequence() @bci=26, line=167 (Interpreted frame)
> >> >  - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted frame)
> >> >  - java.lang.Terminator$1.handle(sun.misc.Signal) @bci=8, line=52
> >> > (Interpreted frame)
> >> >  - sun.misc.Signal$1.run() @bci=8, line=212 (Interpreted frame)
> >> >  - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
> >> >
> >> >
> >> > Thread 3987: (state = BLOCKED)
> >> >  - java.io.UnixFileSystem.list(java.io.File) @bci=0 (Interpreted
> frame)
> >> >  - java.io.File.list() @bci=29, line=1116 (Interpreted frame)
> >> >  - java.io.File.listFiles() @bci=1, line=1201 (Compiled frame)
> >> >  - org.apache.spark.util.Utils$.listFilesSafely(java.io.File) @bci=1,
> >> > line=466 (Interpreted frame)
> >> >  - org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
> @bci=9,
> >> > line=478 (Compiled frame)
> >> >  -
> >> >
> >>
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File)
> >> > @bci=4, line=479 (Compiled frame)
> >> >  -
> >> >
> >>
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object)
> >> > @bci=5, line=478 (Compiled frame)
> >> >  -
> >> >
> >>
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> >> > scala.Function1) @bci=22, line=33 (Compiled frame)
> >> >  - scala.collection.mutable.WrappedArray.foreach(scala.Function1)
> >> @bci=2,
> >> > line=34 (Compiled frame)
> >> >  - org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
> @bci=19,
> >> > line=478 (Interpreted frame)
> >> >  -
> >> >
> >>
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File)
> >> > @bci=14, line=141 (Interpreted frame)
> >> >  -
> >> >
> >>
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object)
> >> > @bci=5, line=139 (Interpreted frame)
> >> >  -
> >> >
> >>
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> >> > scala.Function1) @bci=22, line=33 (Compiled frame)
> >> >  - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
> >> @bci=2,
> >> > line=108 (Interpreted frame)
> >> >  - org.apache.spark.storage.DiskBlockManager$$anon$1.run() @bci=39,
> >> > line=139 (Interpreted frame)
> >> >
> >> >
> >> > I think what happened here is that thread 14308 received the akka
> >> > "shutdown" message and called System.exit().  This started thread
> 3865,
> >> > which is the JVM shutting itself down.  Part of that process is
> running
> >> the
> >> > shutdown hooks, so it started thread 3987.  That thread is the
> shutdown
> >> > hook from addShutdownHook() in DiskBlockManager.scala, which looks
> like
> >> > this:
> >> >
> >> >   private def addShutdownHook() {
> >> >     localDirs.foreach(localDir =>
> >> Utils.registerShutdownDeleteDir(localDir))
> >> >     Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local
> >> > dirs") {
> >> >       override def run() {
> >> >         logDebug("Shutdown hook called")
> >> >         localDirs.foreach { localDir =>
> >> >           try {
> >> >             if (!Utils.hasRootAsShutdownDeleteDir(localDir))
> >> > Utils.deleteRecursively(localDir)
> >> >           } catch {
> >> >             case t: Throwable =>
> >> >               logError("Exception while deleting local spark dir: " +
> >> > localDir, t)
> >> >           }
> >> >         }
> >> >
> >> >         if (shuffleSender != null) {
> >> >           shuffleSender.stop()
> >> >         }
> >> >       }
> >> >     })
> >> >   }
> >> >
> >> > It goes through and deletes the directories recursively.  I was
> thinking
> >> > there might be some issues with concurrently-running shutdown hooks
> >> > deleting things out from underneath each other (shutdown hook javadocs
> >> say
> >> > they're all started in parallel if multiple hooks are added) causing
> the
> >> > File.list() in that last thread to take quite some time.
> >> >
> >> > While I was looking through the stacktrace the JVM finally exited
> (after
> >> > 15-20min at least) so I won't be able to debug more until this bug
> >> strikes
> >> > again.
> >> >
> >> > Any ideas on what might be going on here?
> >> >
> >> > Thanks!
> >> > Andrew
> >>
> >
> >
>

Re: [0.9.0] Possible deadlock in shutdown hook?

Posted by Andrew Ash <an...@andrewash.com>.
Per the book Java Concurrency in Practice the already-running threads
continue running while the shutdown hooks run.  So I think the race between
the writing thread and the deleting thread could be a very real possibility
:/

http://stackoverflow.com/a/3332925/120915


On Thu, Feb 6, 2014 at 2:49 AM, Andrew Ash <an...@andrewash.com> wrote:

> Got a repro locally on my MBP (the other was on a CentOS machine).
>
> Build spark, run a master and a worker with the sbin/start-all.sh script,
> then run this in a shell:
>
> import org.apache.spark.storage.StorageLevel._
> val s = sc.parallelize(1 to 1000000000).persist(MEMORY_AND_DISK_SER);
> s.count
>
> After about a minute, this line appears in the shell logging output:
>
> 14/02/06 02:44:44 WARN BlockManagerMasterActor: Removing BlockManager
> BlockManagerId(0, aash-mbp.dyn.yojoe.local, 57895, 0) with no recent heart
> beats: 57510ms exceeds 45000ms
>
> Ctrl-C the shell.  In jps there is now a worker, a master, and a
> CoarseGrainedExecutorBackend.
>
> Run jstack on the CGEBackend JVM, and I got the attached stacktraces.  I
> waited around for 15min then kill -9'd the JVM and restarted the process.
>
> I wonder if what's happening here is that the threads that are spewing
> data to disk (as that parallelize and persist would do) can write to disk
> faster than the cleanup threads can delete from disk.
>
> What do you think of that theory?
>
>
> Andrew
>
>
>
> On Thu, Feb 6, 2014 at 2:30 AM, Mridul Muralidharan <mr...@gmail.com>wrote:
>
>> shutdown hooks should not take 15 mins are you mentioned !
>> On the other hand, how busy was your disk when this was happening ?
>> (either due to spark or something else ?)
>>
>> It might just be that there was a lot of stuff to remove ?
>>
>> Regards,
>> Mridul
>>
>>
>> On Thu, Feb 6, 2014 at 3:50 PM, Andrew Ash <an...@andrewash.com> wrote:
>> > Hi Spark devs,
>> >
>> > Occasionally when hitting Ctrl-C in the scala spark shell on 0.9.0 one
>> of
>> > my workers goes dead in the spark master UI.  I'm using the standalone
>> > cluster and didn't ever see this while using 0.8.0 so I think it may be
>> a
>> > regression.
>> >
>> > When I prod on the hung CoarseGrainedExecutorBackend JVM with jstack and
>> > jmap -heap, it doesn't respond unless I add the -F force flag.  The heap
>> > isn't full, but there are some interesting bits in the jstack.  Poking
>> > around a little, I think there may be some kind of deadlock in the
>> shutdown
>> > hooks.
>> >
>> > Below are the threads I think are most interesting:
>> >
>> > Thread 14308: (state = BLOCKED)
>> >  - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted frame)
>> >  - java.lang.Runtime.exit(int) @bci=14, line=109 (Interpreted frame)
>> >  - java.lang.System.exit(int) @bci=4, line=962 (Interpreted frame)
>> >  -
>> >
>> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object,
>> > scala.Function1) @bci=352, line=81 (Interpreted frame)
>> >  - akka.actor.ActorCell.receiveMessage(java.lang.Object) @bci=25,
>> line=498
>> > (Interpreted frame)
>> >  - akka.actor.ActorCell.invoke(akka.dispatch.Envelope) @bci=39, line=456
>> > (Interpreted frame)
>> >  - akka.dispatch.Mailbox.processMailbox(int, long) @bci=24, line=237
>> > (Interpreted frame)
>> >  - akka.dispatch.Mailbox.run() @bci=20, line=219 (Interpreted frame)
>> >  - akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec()
>> > @bci=4, line=386 (Interpreted frame)
>> >  - scala.concurrent.forkjoin.ForkJoinTask.doExec() @bci=10, line=260
>> > (Compiled frame)
>> >  -
>> >
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask)
>> > @bci=10, line=1339 (Compiled frame)
>> >  -
>> >
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue)
>> > @bci=11, line=1979 (Compiled frame)
>> >  - scala.concurrent.forkjoin.ForkJoinWorkerThread.run() @bci=14,
>> line=107
>> > (Interpreted frame)
>> >
>> > Thread 3865: (state = BLOCKED)
>> >  - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
>> >  - java.lang.Thread.join(long) @bci=38, line=1280 (Interpreted frame)
>> >  - java.lang.Thread.join() @bci=2, line=1354 (Interpreted frame)
>> >  - java.lang.ApplicationShutdownHooks.runHooks() @bci=87, line=106
>> > (Interpreted frame)
>> >  - java.lang.ApplicationShutdownHooks$1.run() @bci=0, line=46
>> (Interpreted
>> > frame)
>> >  - java.lang.Shutdown.runHooks() @bci=39, line=123 (Interpreted frame)
>> >  - java.lang.Shutdown.sequence() @bci=26, line=167 (Interpreted frame)
>> >  - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted frame)
>> >  - java.lang.Terminator$1.handle(sun.misc.Signal) @bci=8, line=52
>> > (Interpreted frame)
>> >  - sun.misc.Signal$1.run() @bci=8, line=212 (Interpreted frame)
>> >  - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
>> >
>> >
>> > Thread 3987: (state = BLOCKED)
>> >  - java.io.UnixFileSystem.list(java.io.File) @bci=0 (Interpreted frame)
>> >  - java.io.File.list() @bci=29, line=1116 (Interpreted frame)
>> >  - java.io.File.listFiles() @bci=1, line=1201 (Compiled frame)
>> >  - org.apache.spark.util.Utils$.listFilesSafely(java.io.File) @bci=1,
>> > line=466 (Interpreted frame)
>> >  - org.apache.spark.util.Utils$.deleteRecursively(java.io.File) @bci=9,
>> > line=478 (Compiled frame)
>> >  -
>> >
>> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File)
>> > @bci=4, line=479 (Compiled frame)
>> >  -
>> >
>> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object)
>> > @bci=5, line=478 (Compiled frame)
>> >  -
>> >
>> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>> > scala.Function1) @bci=22, line=33 (Compiled frame)
>> >  - scala.collection.mutable.WrappedArray.foreach(scala.Function1)
>> @bci=2,
>> > line=34 (Compiled frame)
>> >  - org.apache.spark.util.Utils$.deleteRecursively(java.io.File) @bci=19,
>> > line=478 (Interpreted frame)
>> >  -
>> >
>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File)
>> > @bci=14, line=141 (Interpreted frame)
>> >  -
>> >
>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object)
>> > @bci=5, line=139 (Interpreted frame)
>> >  -
>> >
>> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>> > scala.Function1) @bci=22, line=33 (Compiled frame)
>> >  - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
>> @bci=2,
>> > line=108 (Interpreted frame)
>> >  - org.apache.spark.storage.DiskBlockManager$$anon$1.run() @bci=39,
>> > line=139 (Interpreted frame)
>> >
>> >
>> > I think what happened here is that thread 14308 received the akka
>> > "shutdown" message and called System.exit().  This started thread 3865,
>> > which is the JVM shutting itself down.  Part of that process is running
>> the
>> > shutdown hooks, so it started thread 3987.  That thread is the shutdown
>> > hook from addShutdownHook() in DiskBlockManager.scala, which looks like
>> > this:
>> >
>> >   private def addShutdownHook() {
>> >     localDirs.foreach(localDir =>
>> Utils.registerShutdownDeleteDir(localDir))
>> >     Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local
>> > dirs") {
>> >       override def run() {
>> >         logDebug("Shutdown hook called")
>> >         localDirs.foreach { localDir =>
>> >           try {
>> >             if (!Utils.hasRootAsShutdownDeleteDir(localDir))
>> > Utils.deleteRecursively(localDir)
>> >           } catch {
>> >             case t: Throwable =>
>> >               logError("Exception while deleting local spark dir: " +
>> > localDir, t)
>> >           }
>> >         }
>> >
>> >         if (shuffleSender != null) {
>> >           shuffleSender.stop()
>> >         }
>> >       }
>> >     })
>> >   }
>> >
>> > It goes through and deletes the directories recursively.  I was thinking
>> > there might be some issues with concurrently-running shutdown hooks
>> > deleting things out from underneath each other (shutdown hook javadocs
>> say
>> > they're all started in parallel if multiple hooks are added) causing the
>> > File.list() in that last thread to take quite some time.
>> >
>> > While I was looking through the stacktrace the JVM finally exited (after
>> > 15-20min at least) so I won't be able to debug more until this bug
>> strikes
>> > again.
>> >
>> > Any ideas on what might be going on here?
>> >
>> > Thanks!
>> > Andrew
>>
>
>

Re: [0.9.0] Possible deadlock in shutdown hook?

Posted by Andrew Ash <an...@andrewash.com>.
To keep this thread from getting lost, I've opened a ticket here:
https://spark-project.atlassian.net/browse/SPARK-1107


On Fri, Feb 7, 2014 at 12:53 AM, Andrew Ash <an...@andrewash.com> wrote:

> Agreed.  Also I'm happy to test any patches since I have a consistent
> repro now (see one of my first responses in this thread)
>
>
> On Fri, Feb 7, 2014 at 12:51 AM, Mridul Muralidharan <mr...@gmail.com>wrote:
>
>> This looks like the most reasonable approach to resolve this !
>>
>> Regards,
>> Mridul
>>
>>
>> On Fri, Feb 7, 2014 at 1:43 PM, Tathagata Das
>> <ta...@gmail.com> wrote:
>> > Or we can try adding a shutdown hook in the
>> > Executor<
>> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala?source=c#L127
>> >to
>> > call threadPool.shutdownNow(). May have to catch the
>> > InterruptedException and handle it gracefully out
>> > here<
>> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala?source=c#L255
>> >
>> > .
>> >
>> > TD
>> >
>> >
>> > On Thu, Feb 6, 2014 at 11:49 PM, Andrew Ash <an...@andrewash.com>
>> wrote:
>> >
>> >> I think we can enumerate all current threads with the ThreadMXBean,
>> filter
>> >> to those threads with the name of executor pool in them, and interrupt
>> >> them.
>> >>
>> >>
>> >>
>> http://docs.oracle.com/javase/6/docs/api/java/lang/management/ManagementFactory.html#getThreadMXBean%28%29
>> >>
>> >> The executor threads are currently named according to the pattern
>> "Executor
>> >> task launch worker-X"
>> >>
>> >>
>> >> On Thu, Feb 6, 2014 at 11:45 PM, Tathagata Das
>> >> <ta...@gmail.com>wrote:
>> >>
>> >> > That definitely sound more reliable. Worth trying out if there is a
>> >> > reliable way of reproducing the deadlock-like scenario.
>> >> >
>> >> > TD
>> >> >
>> >> >
>> >> > On Thu, Feb 6, 2014 at 11:38 PM, Matei Zaharia <
>> matei.zaharia@gmail.com
>> >> > >wrote:
>> >> >
>> >> > > I don't think we necessarily want to do this through the
>> DAGScheduler
>> >> > > because the worker might also shut down due to some unusual
>> termination
>> >> > > condition, like the driver node crashing. Can't we do it at the
>> top of
>> >> > the
>> >> > > shutdown hook instead? If all the threads are in the same thread
>> pool
>> >> it
>> >> > > might be possible to interrupt or stop the whole pool.
>> >> > >
>> >> > > Matei
>> >> > >
>> >> > > On Feb 6, 2014, at 11:30 PM, Andrew Ash <an...@andrewash.com>
>> wrote:
>> >> > >
>> >> > > > That's genius.  Of course when a worker is told to shutdown it
>> should
>> >> > > > interrupt its worker threads -- I think that would address this
>> >> issue.
>> >> > > >
>> >> > > > Are you thinking to put
>> >> > > >
>> >> > > > running.map(_.jobId).foreach { handleJobCancellation }
>> >> > > >
>> >> > > > at the top of the StopDAGScheduler block?
>> >> > > >
>> >> > > >
>> >> > > > On Thu, Feb 6, 2014 at 11:05 PM, Tathagata Das
>> >> > > > <ta...@gmail.com>wrote:
>> >> > > >
>> >> > > >> Its highly likely that the executor with the threadpool that
>> runs
>> >> the
>> >> > > tasks
>> >> > > >> are the only set of threads that writes to disk. The tasks are
>> >> > designed
>> >> > > to
>> >> > > >> be interrupted when the corresponding job is cancelled. So a
>> >> > reasonably
>> >> > > >> simple way could be to actually cancel the currently active
>> jobs,
>> >> > which
>> >> > > >> would send the signal to the worker to stop the tasks.
>> Currently,
>> >> the
>> >> > > >> DAGScheduler<
>> >> > > >>
>> >> > >
>> >> >
>> >>
>> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L610
>> >> > > >>> does
>> >> > > >> not seem to actually cancel the jobs, only mark them as failed.
>> So
>> >> it
>> >> > > >> may be a simple addition.
>> >> > > >>
>> >> > > >> There may be some complications with the external spilling of
>> >> shuffle
>> >> > > data
>> >> > > >> to disk not stopping immediately when the task is marked for
>> >> killing.
>> >> > > Gotta
>> >> > > >> try it out.
>> >> > > >>
>> >> > > >> TD
>> >> > > >>
>> >> > > >> On Thu, Feb 6, 2014 at 10:39 PM, Andrew Ash <
>> andrew@andrewash.com>
>> >> > > wrote:
>> >> > > >>
>> >> > > >>> There is probably just one threadpool that has task threads --
>> is
>> >> it
>> >> > > >>> possible to enumerate and interrupt just those?  We may need to
>> >> keep
>> >> > > >> string
>> >> > > >>> a reference to that threadpool through to the shutdown thread
>> to
>> >> make
>> >> > > >> that
>> >> > > >>> happen.
>> >> > > >>>
>> >> > > >>>
>> >> > > >>> On Thu, Feb 6, 2014 at 10:36 PM, Mridul Muralidharan <
>> >> > mridul@gmail.com
>> >> > > >>>> wrote:
>> >> > > >>>
>> >> > > >>>> Ideally, interrupting the thread writing to disk should be
>> >> > sufficient
>> >> > > >>>> - though since we are in middle of shutdown when this is
>> >> happening,
>> >> > it
>> >> > > >>>> is best case effort anyway.
>> >> > > >>>> Identifying which threads to interrupt will be interesting
>> since
>> >> > most
>> >> > > >>>> of them are driven by threadpool's and we cant list all
>> threads
>> >> and
>> >> > > >>>> interrupt all of them !
>> >> > > >>>>
>> >> > > >>>>
>> >> > > >>>> Regards,
>> >> > > >>>> Mridul
>> >> > > >>>>
>> >> > > >>>>
>> >> > > >>>> On Fri, Feb 7, 2014 at 5:57 AM, Andrew Ash <
>> andrew@andrewash.com>
>> >> > > >> wrote:
>> >> > > >>>>> I think the solution where we stop the writing threads and
>> then
>> >> let
>> >> > > >> the
>> >> > > >>>>> deleting threads completely clean up is the best option
>> since the
>> >> > > >> final
>> >> > > >>>>> state doesn't have half-deleted temp dirs scattered across
>> the
>> >> > > >> cluster.
>> >> > > >>>>>
>> >> > > >>>>> How feasible do you think it'd be to interrupt the other
>> threads?
>> >> > > >>>>>
>> >> > > >>>>>
>> >> > > >>>>> On Thu, Feb 6, 2014 at 10:54 AM, Mridul Muralidharan <
>> >> > > >> mridul@gmail.com
>> >> > > >>>>> wrote:
>> >> > > >>>>>
>> >> > > >>>>>> Looks like a pathological corner case here - where the the
>> >> delete
>> >> > > >>>>>> thread is not getting run while the OS is busy prioritizing
>> the
>> >> > > >> thread
>> >> > > >>>>>> writing data (probably with heavy gc too).
>> >> > > >>>>>> Ideally, the delete thread would list files, remove them and
>> >> then
>> >> > > >> fail
>> >> > > >>>>>> when it tries to remove the non empty directory (since other
>> >> > thread
>> >> > > >>>>>> might be creating more in parallel).
>> >> > > >>>>>>
>> >> > > >>>>>>
>> >> > > >>>>>> Regards,
>> >> > > >>>>>> Mridul
>> >> > > >>>>>>
>> >> > > >>>>>>
>> >> > > >>>>>> On Thu, Feb 6, 2014 at 4:19 PM, Andrew Ash <
>> >> andrew@andrewash.com>
>> >> > > >>>> wrote:
>> >> > > >>>>>>> Got a repro locally on my MBP (the other was on a CentOS
>> >> > machine).
>> >> > > >>>>>>>
>> >> > > >>>>>>> Build spark, run a master and a worker with the
>> >> sbin/start-all.sh
>> >> > > >>>> script,
>> >> > > >>>>>>> then run this in a shell:
>> >> > > >>>>>>>
>> >> > > >>>>>>> import org.apache.spark.storage.StorageLevel._
>> >> > > >>>>>>> val s = sc.parallelize(1 to
>> >> > > >>> 1000000000).persist(MEMORY_AND_DISK_SER);
>> >> > > >>>>>>> s.count
>> >> > > >>>>>>>
>> >> > > >>>>>>> After about a minute, this line appears in the shell
>> logging
>> >> > > >> output:
>> >> > > >>>>>>>
>> >> > > >>>>>>> 14/02/06 02:44:44 WARN BlockManagerMasterActor: Removing
>> >> > > >>> BlockManager
>> >> > > >>>>>>> BlockManagerId(0, aash-mbp.dyn.yojoe.local, 57895, 0) with
>> no
>> >> > > >> recent
>> >> > > >>>>>> heart
>> >> > > >>>>>>> beats: 57510ms exceeds 45000ms
>> >> > > >>>>>>>
>> >> > > >>>>>>> Ctrl-C the shell.  In jps there is now a worker, a master,
>> and
>> >> a
>> >> > > >>>>>>> CoarseGrainedExecutorBackend.
>> >> > > >>>>>>>
>> >> > > >>>>>>> Run jstack on the CGEBackend JVM, and I got the attached
>> >> > > >>> stacktraces.
>> >> > > >>>> I
>> >> > > >>>>>>> waited around for 15min then kill -9'd the JVM and
>> restarted
>> >> the
>> >> > > >>>> process.
>> >> > > >>>>>>>
>> >> > > >>>>>>> I wonder if what's happening here is that the threads that
>> are
>> >> > > >>> spewing
>> >> > > >>>>>> data
>> >> > > >>>>>>> to disk (as that parallelize and persist would do) can
>> write to
>> >> > > >> disk
>> >> > > >>>>>> faster
>> >> > > >>>>>>> than the cleanup threads can delete from disk.
>> >> > > >>>>>>>
>> >> > > >>>>>>> What do you think of that theory?
>> >> > > >>>>>>>
>> >> > > >>>>>>>
>> >> > > >>>>>>> Andrew
>> >> > > >>>>>>>
>> >> > > >>>>>>>
>> >> > > >>>>>>>
>> >> > > >>>>>>> On Thu, Feb 6, 2014 at 2:30 AM, Mridul Muralidharan <
>> >> > > >>> mridul@gmail.com
>> >> > > >>>>>
>> >> > > >>>>>>> wrote:
>> >> > > >>>>>>>>
>> >> > > >>>>>>>> shutdown hooks should not take 15 mins are you mentioned !
>> >> > > >>>>>>>> On the other hand, how busy was your disk when this was
>> >> > > >> happening ?
>> >> > > >>>>>>>> (either due to spark or something else ?)
>> >> > > >>>>>>>>
>> >> > > >>>>>>>> It might just be that there was a lot of stuff to remove ?
>> >> > > >>>>>>>>
>> >> > > >>>>>>>> Regards,
>> >> > > >>>>>>>> Mridul
>> >> > > >>>>>>>>
>> >> > > >>>>>>>>
>> >> > > >>>>>>>> On Thu, Feb 6, 2014 at 3:50 PM, Andrew Ash <
>> >> > andrew@andrewash.com
>> >> > > >>>
>> >> > > >>>>>> wrote:
>> >> > > >>>>>>>>> Hi Spark devs,
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>> Occasionally when hitting Ctrl-C in the scala spark
>> shell on
>> >> > > >>> 0.9.0
>> >> > > >>>> one
>> >> > > >>>>>>>>> of
>> >> > > >>>>>>>>> my workers goes dead in the spark master UI.  I'm using
>> the
>> >> > > >>>> standalone
>> >> > > >>>>>>>>> cluster and didn't ever see this while using 0.8.0 so I
>> think
>> >> > > >> it
>> >> > > >>>> may
>> >> > > >>>>>> be
>> >> > > >>>>>>>>> a
>> >> > > >>>>>>>>> regression.
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>> When I prod on the hung CoarseGrainedExecutorBackend JVM
>> with
>> >> > > >>>> jstack
>> >> > > >>>>>> and
>> >> > > >>>>>>>>> jmap -heap, it doesn't respond unless I add the -F force
>> >> flag.
>> >> > > >>> The
>> >> > > >>>>>> heap
>> >> > > >>>>>>>>> isn't full, but there are some interesting bits in the
>> >> jstack.
>> >> > > >>>> Poking
>> >> > > >>>>>>>>> around a little, I think there may be some kind of
>> deadlock
>> >> in
>> >> > > >>> the
>> >> > > >>>>>>>>> shutdown
>> >> > > >>>>>>>>> hooks.
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>> Below are the threads I think are most interesting:
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>> Thread 14308: (state = BLOCKED)
>> >> > > >>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212
>> (Interpreted
>> >> > > >>>> frame)
>> >> > > >>>>>>>>> - java.lang.Runtime.exit(int) @bci=14, line=109
>> (Interpreted
>> >> > > >>>> frame)
>> >> > > >>>>>>>>> - java.lang.System.exit(int) @bci=4, line=962
>> (Interpreted
>> >> > > >>> frame)
>> >> > > >>>>>>>>> -
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>>
>> >> > > >>>>>>
>> >> > > >>>>
>> >> > > >>>
>> >> > > >>
>> >> > >
>> >> >
>> >>
>> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object,
>> >> > > >>>>>>>>> scala.Function1) @bci=352, line=81 (Interpreted frame)
>> >> > > >>>>>>>>> - akka.actor.ActorCell.receiveMessage(java.lang.Object)
>> >> > > >> @bci=25,
>> >> > > >>>>>>>>> line=498
>> >> > > >>>>>>>>> (Interpreted frame)
>> >> > > >>>>>>>>> - akka.actor.ActorCell.invoke(akka.dispatch.Envelope)
>> >> @bci=39,
>> >> > > >>>>>> line=456
>> >> > > >>>>>>>>> (Interpreted frame)
>> >> > > >>>>>>>>> - akka.dispatch.Mailbox.processMailbox(int, long)
>> @bci=24,
>> >> > > >>>> line=237
>> >> > > >>>>>>>>> (Interpreted frame)
>> >> > > >>>>>>>>> - akka.dispatch.Mailbox.run() @bci=20, line=219
>> (Interpreted
>> >> > > >>>> frame)
>> >> > > >>>>>>>>> -
>> >> > > >>>>
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec()
>> >> > > >>>>>>>>> @bci=4, line=386 (Interpreted frame)
>> >> > > >>>>>>>>> - scala.concurrent.forkjoin.ForkJoinTask.doExec()
>> @bci=10,
>> >> > > >>>> line=260
>> >> > > >>>>>>>>> (Compiled frame)
>> >> > > >>>>>>>>> -
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>>
>> >> > > >>>>>>
>> >> > > >>>>
>> >> > > >>>
>> >> > > >>
>> >> > >
>> >> >
>> >>
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask)
>> >> > > >>>>>>>>> @bci=10, line=1339 (Compiled frame)
>> >> > > >>>>>>>>> -
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>>
>> >> > > >>>>>>
>> >> > > >>>>
>> >> > > >>>
>> >> > > >>
>> >> > >
>> >> >
>> >>
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue)
>> >> > > >>>>>>>>> @bci=11, line=1979 (Compiled frame)
>> >> > > >>>>>>>>> - scala.concurrent.forkjoin.ForkJoinWorkerThread.run()
>> >> > > >> @bci=14,
>> >> > > >>>>>>>>> line=107
>> >> > > >>>>>>>>> (Interpreted frame)
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>> Thread 3865: (state = BLOCKED)
>> >> > > >>>>>>>>> - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
>> >> > > >>>>>>>>> - java.lang.Thread.join(long) @bci=38, line=1280
>> (Interpreted
>> >> > > >>>> frame)
>> >> > > >>>>>>>>> - java.lang.Thread.join() @bci=2, line=1354 (Interpreted
>> >> > > >> frame)
>> >> > > >>>>>>>>> - java.lang.ApplicationShutdownHooks.runHooks() @bci=87,
>> >> > > >>> line=106
>> >> > > >>>>>>>>> (Interpreted frame)
>> >> > > >>>>>>>>> - java.lang.ApplicationShutdownHooks$1.run() @bci=0,
>> line=46
>> >> > > >>>>>>>>> (Interpreted
>> >> > > >>>>>>>>> frame)
>> >> > > >>>>>>>>> - java.lang.Shutdown.runHooks() @bci=39, line=123
>> >> (Interpreted
>> >> > > >>>> frame)
>> >> > > >>>>>>>>> - java.lang.Shutdown.sequence() @bci=26, line=167
>> >> (Interpreted
>> >> > > >>>> frame)
>> >> > > >>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212
>> (Interpreted
>> >> > > >>>> frame)
>> >> > > >>>>>>>>> - java.lang.Terminator$1.handle(sun.misc.Signal) @bci=8,
>> >> > > >> line=52
>> >> > > >>>>>>>>> (Interpreted frame)
>> >> > > >>>>>>>>> - sun.misc.Signal$1.run() @bci=8, line=212 (Interpreted
>> >> frame)
>> >> > > >>>>>>>>> - java.lang.Thread.run() @bci=11, line=744 (Interpreted
>> >> frame)
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>> Thread 3987: (state = BLOCKED)
>> >> > > >>>>>>>>> - java.io.UnixFileSystem.list(java.io.File) @bci=0
>> >> > > >> (Interpreted
>> >> > > >>>>>> frame)
>> >> > > >>>>>>>>> - java.io.File.list() @bci=29, line=1116 (Interpreted
>> frame)
>> >> > > >>>>>>>>> - java.io.File.listFiles() @bci=1, line=1201 (Compiled
>> frame)
>> >> > > >>>>>>>>> -
>> org.apache.spark.util.Utils$.listFilesSafely(java.io.File)
>> >> > > >>>> @bci=1,
>> >> > > >>>>>>>>> line=466 (Interpreted frame)
>> >> > > >>>>>>>>> -
>> >> org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
>> >> > > >>>>>> @bci=9,
>> >> > > >>>>>>>>> line=478 (Compiled frame)
>> >> > > >>>>>>>>> -
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>>
>> >> > > >>>>>>
>> >> > > >>>>
>> >> > > >>>
>> >> > > >>
>> >> > >
>> >> >
>> >>
>> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File)
>> >> > > >>>>>>>>> @bci=4, line=479 (Compiled frame)
>> >> > > >>>>>>>>> -
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>>
>> >> > > >>>>>>
>> >> > > >>>>
>> >> > > >>>
>> >> > > >>
>> >> > >
>> >> >
>> >>
>> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object)
>> >> > > >>>>>>>>> @bci=5, line=478 (Compiled frame)
>> >> > > >>>>>>>>> -
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>>
>> >> > > >>>>>>
>> >> > > >>>>
>> >> > > >>>
>> >> > > >>
>> >> > >
>> >> >
>> >>
>> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>> >> > > >>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
>> >> > > >>>>>>>>> -
>> >> > > >> scala.collection.mutable.WrappedArray.foreach(scala.Function1)
>> >> > > >>>>>>>>> @bci=2,
>> >> > > >>>>>>>>> line=34 (Compiled frame)
>> >> > > >>>>>>>>> -
>> >> org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
>> >> > > >>>>>> @bci=19,
>> >> > > >>>>>>>>> line=478 (Interpreted frame)
>> >> > > >>>>>>>>> -
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>>
>> >> > > >>>>>>
>> >> > > >>>>
>> >> > > >>>
>> >> > > >>
>> >> > >
>> >> >
>> >>
>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File)
>> >> > > >>>>>>>>> @bci=14, line=141 (Interpreted frame)
>> >> > > >>>>>>>>> -
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>>
>> >> > > >>>>>>
>> >> > > >>>>
>> >> > > >>>
>> >> > > >>
>> >> > >
>> >> >
>> >>
>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object)
>> >> > > >>>>>>>>> @bci=5, line=139 (Interpreted frame)
>> >> > > >>>>>>>>> -
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>>
>> >> > > >>>>>>
>> >> > > >>>>
>> >> > > >>>
>> >> > > >>
>> >> > >
>> >> >
>> >>
>> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>> >> > > >>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
>> >> > > >>>>>>>>> -
>> >> > > >>>
>> scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
>> >> > > >>>>>>>>> @bci=2,
>> >> > > >>>>>>>>> line=108 (Interpreted frame)
>> >> > > >>>>>>>>> - org.apache.spark.storage.DiskBlockManager$$anon$1.run()
>> >> > > >>> @bci=39,
>> >> > > >>>>>>>>> line=139 (Interpreted frame)
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>> I think what happened here is that thread 14308 received
>> the
>> >> > > >> akka
>> >> > > >>>>>>>>> "shutdown" message and called System.exit().  This
>> started
>> >> > > >> thread
>> >> > > >>>>>> 3865,
>> >> > > >>>>>>>>> which is the JVM shutting itself down.  Part of that
>> process
>> >> is
>> >> > > >>>>>> running
>> >> > > >>>>>>>>> the
>> >> > > >>>>>>>>> shutdown hooks, so it started thread 3987.  That thread
>> is
>> >> the
>> >> > > >>>>>> shutdown
>> >> > > >>>>>>>>> hook from addShutdownHook() in DiskBlockManager.scala,
>> which
>> >> > > >>> looks
>> >> > > >>>>>> like
>> >> > > >>>>>>>>> this:
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>>  private def addShutdownHook() {
>> >> > > >>>>>>>>>    localDirs.foreach(localDir =>
>> >> > > >>>>>>>>> Utils.registerShutdownDeleteDir(localDir))
>> >> > > >>>>>>>>>    Runtime.getRuntime.addShutdownHook(new Thread("delete
>> >> Spark
>> >> > > >>>> local
>> >> > > >>>>>>>>> dirs") {
>> >> > > >>>>>>>>>      override def run() {
>> >> > > >>>>>>>>>        logDebug("Shutdown hook called")
>> >> > > >>>>>>>>>        localDirs.foreach { localDir =>
>> >> > > >>>>>>>>>          try {
>> >> > > >>>>>>>>>            if
>> (!Utils.hasRootAsShutdownDeleteDir(localDir))
>> >> > > >>>>>>>>> Utils.deleteRecursively(localDir)
>> >> > > >>>>>>>>>          } catch {
>> >> > > >>>>>>>>>            case t: Throwable =>
>> >> > > >>>>>>>>>              logError("Exception while deleting local
>> spark
>> >> > > >> dir:
>> >> > > >>>> " +
>> >> > > >>>>>>>>> localDir, t)
>> >> > > >>>>>>>>>          }
>> >> > > >>>>>>>>>        }
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>>        if (shuffleSender != null) {
>> >> > > >>>>>>>>>          shuffleSender.stop()
>> >> > > >>>>>>>>>        }
>> >> > > >>>>>>>>>      }
>> >> > > >>>>>>>>>    })
>> >> > > >>>>>>>>>  }
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>> It goes through and deletes the directories recursively.
>>  I
>> >> was
>> >> > > >>>>>> thinking
>> >> > > >>>>>>>>> there might be some issues with concurrently-running
>> shutdown
>> >> > > >>> hooks
>> >> > > >>>>>>>>> deleting things out from underneath each other (shutdown
>> hook
>> >> > > >>>> javadocs
>> >> > > >>>>>>>>> say
>> >> > > >>>>>>>>> they're all started in parallel if multiple hooks are
>> added)
>> >> > > >>>> causing
>> >> > > >>>>>> the
>> >> > > >>>>>>>>> File.list() in that last thread to take quite some time.
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>> While I was looking through the stacktrace the JVM
>> finally
>> >> > > >> exited
>> >> > > >>>>>> (after
>> >> > > >>>>>>>>> 15-20min at least) so I won't be able to debug more until
>> >> this
>> >> > > >>> bug
>> >> > > >>>>>>>>> strikes
>> >> > > >>>>>>>>> again.
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>> Any ideas on what might be going on here?
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>> Thanks!
>> >> > > >>>>>>>>> Andrew
>> >> > > >>>>>>>
>> >> > > >>>>>>>
>> >> > > >>>>>>
>> >> > > >>>>
>> >> > > >>>
>> >> > > >>
>> >> > >
>> >> > >
>> >> >
>> >>
>>
>
>

Re: [0.9.0] Possible deadlock in shutdown hook?

Posted by Andrew Ash <an...@andrewash.com>.
Agreed.  Also I'm happy to test any patches since I have a consistent repro
now (see one of my first responses in this thread)


On Fri, Feb 7, 2014 at 12:51 AM, Mridul Muralidharan <mr...@gmail.com>wrote:

> This looks like the most reasonable approach to resolve this !
>
> Regards,
> Mridul
>
>
> On Fri, Feb 7, 2014 at 1:43 PM, Tathagata Das
> <ta...@gmail.com> wrote:
> > Or we can try adding a shutdown hook in the
> > Executor<
> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala?source=c#L127
> >to
> > call threadPool.shutdownNow(). May have to catch the
> > InterruptedException and handle it gracefully out
> > here<
> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala?source=c#L255
> >
> > .
> >
> > TD
> >
> >
> > On Thu, Feb 6, 2014 at 11:49 PM, Andrew Ash <an...@andrewash.com>
> wrote:
> >
> >> I think we can enumerate all current threads with the ThreadMXBean,
> filter
> >> to those threads with the name of executor pool in them, and interrupt
> >> them.
> >>
> >>
> >>
> http://docs.oracle.com/javase/6/docs/api/java/lang/management/ManagementFactory.html#getThreadMXBean%28%29
> >>
> >> The executor threads are currently named according to the pattern
> "Executor
> >> task launch worker-X"
> >>
> >>
> >> On Thu, Feb 6, 2014 at 11:45 PM, Tathagata Das
> >> <ta...@gmail.com>wrote:
> >>
> >> > That definitely sound more reliable. Worth trying out if there is a
> >> > reliable way of reproducing the deadlock-like scenario.
> >> >
> >> > TD
> >> >
> >> >
> >> > On Thu, Feb 6, 2014 at 11:38 PM, Matei Zaharia <
> matei.zaharia@gmail.com
> >> > >wrote:
> >> >
> >> > > I don't think we necessarily want to do this through the
> DAGScheduler
> >> > > because the worker might also shut down due to some unusual
> termination
> >> > > condition, like the driver node crashing. Can't we do it at the top
> of
> >> > the
> >> > > shutdown hook instead? If all the threads are in the same thread
> pool
> >> it
> >> > > might be possible to interrupt or stop the whole pool.
> >> > >
> >> > > Matei
> >> > >
> >> > > On Feb 6, 2014, at 11:30 PM, Andrew Ash <an...@andrewash.com>
> wrote:
> >> > >
> >> > > > That's genius.  Of course when a worker is told to shutdown it
> should
> >> > > > interrupt its worker threads -- I think that would address this
> >> issue.
> >> > > >
> >> > > > Are you thinking to put
> >> > > >
> >> > > > running.map(_.jobId).foreach { handleJobCancellation }
> >> > > >
> >> > > > at the top of the StopDAGScheduler block?
> >> > > >
> >> > > >
> >> > > > On Thu, Feb 6, 2014 at 11:05 PM, Tathagata Das
> >> > > > <ta...@gmail.com>wrote:
> >> > > >
> >> > > >> Its highly likely that the executor with the threadpool that runs
> >> the
> >> > > tasks
> >> > > >> are the only set of threads that writes to disk. The tasks are
> >> > designed
> >> > > to
> >> > > >> be interrupted when the corresponding job is cancelled. So a
> >> > reasonably
> >> > > >> simple way could be to actually cancel the currently active jobs,
> >> > which
> >> > > >> would send the signal to the worker to stop the tasks. Currently,
> >> the
> >> > > >> DAGScheduler<
> >> > > >>
> >> > >
> >> >
> >>
> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L610
> >> > > >>> does
> >> > > >> not seem to actually cancel the jobs, only mark them as failed.
> So
> >> it
> >> > > >> may be a simple addition.
> >> > > >>
> >> > > >> There may be some complications with the external spilling of
> >> shuffle
> >> > > data
> >> > > >> to disk not stopping immediately when the task is marked for
> >> killing.
> >> > > Gotta
> >> > > >> try it out.
> >> > > >>
> >> > > >> TD
> >> > > >>
> >> > > >> On Thu, Feb 6, 2014 at 10:39 PM, Andrew Ash <
> andrew@andrewash.com>
> >> > > wrote:
> >> > > >>
> >> > > >>> There is probably just one threadpool that has task threads --
> is
> >> it
> >> > > >>> possible to enumerate and interrupt just those?  We may need to
> >> keep
> >> > > >> string
> >> > > >>> a reference to that threadpool through to the shutdown thread to
> >> make
> >> > > >> that
> >> > > >>> happen.
> >> > > >>>
> >> > > >>>
> >> > > >>> On Thu, Feb 6, 2014 at 10:36 PM, Mridul Muralidharan <
> >> > mridul@gmail.com
> >> > > >>>> wrote:
> >> > > >>>
> >> > > >>>> Ideally, interrupting the thread writing to disk should be
> >> > sufficient
> >> > > >>>> - though since we are in middle of shutdown when this is
> >> happening,
> >> > it
> >> > > >>>> is best case effort anyway.
> >> > > >>>> Identifying which threads to interrupt will be interesting
> since
> >> > most
> >> > > >>>> of them are driven by threadpool's and we cant list all threads
> >> and
> >> > > >>>> interrupt all of them !
> >> > > >>>>
> >> > > >>>>
> >> > > >>>> Regards,
> >> > > >>>> Mridul
> >> > > >>>>
> >> > > >>>>
> >> > > >>>> On Fri, Feb 7, 2014 at 5:57 AM, Andrew Ash <
> andrew@andrewash.com>
> >> > > >> wrote:
> >> > > >>>>> I think the solution where we stop the writing threads and
> then
> >> let
> >> > > >> the
> >> > > >>>>> deleting threads completely clean up is the best option since
> the
> >> > > >> final
> >> > > >>>>> state doesn't have half-deleted temp dirs scattered across the
> >> > > >> cluster.
> >> > > >>>>>
> >> > > >>>>> How feasible do you think it'd be to interrupt the other
> threads?
> >> > > >>>>>
> >> > > >>>>>
> >> > > >>>>> On Thu, Feb 6, 2014 at 10:54 AM, Mridul Muralidharan <
> >> > > >> mridul@gmail.com
> >> > > >>>>> wrote:
> >> > > >>>>>
> >> > > >>>>>> Looks like a pathological corner case here - where the the
> >> delete
> >> > > >>>>>> thread is not getting run while the OS is busy prioritizing
> the
> >> > > >> thread
> >> > > >>>>>> writing data (probably with heavy gc too).
> >> > > >>>>>> Ideally, the delete thread would list files, remove them and
> >> then
> >> > > >> fail
> >> > > >>>>>> when it tries to remove the non empty directory (since other
> >> > thread
> >> > > >>>>>> might be creating more in parallel).
> >> > > >>>>>>
> >> > > >>>>>>
> >> > > >>>>>> Regards,
> >> > > >>>>>> Mridul
> >> > > >>>>>>
> >> > > >>>>>>
> >> > > >>>>>> On Thu, Feb 6, 2014 at 4:19 PM, Andrew Ash <
> >> andrew@andrewash.com>
> >> > > >>>> wrote:
> >> > > >>>>>>> Got a repro locally on my MBP (the other was on a CentOS
> >> > machine).
> >> > > >>>>>>>
> >> > > >>>>>>> Build spark, run a master and a worker with the
> >> sbin/start-all.sh
> >> > > >>>> script,
> >> > > >>>>>>> then run this in a shell:
> >> > > >>>>>>>
> >> > > >>>>>>> import org.apache.spark.storage.StorageLevel._
> >> > > >>>>>>> val s = sc.parallelize(1 to
> >> > > >>> 1000000000).persist(MEMORY_AND_DISK_SER);
> >> > > >>>>>>> s.count
> >> > > >>>>>>>
> >> > > >>>>>>> After about a minute, this line appears in the shell logging
> >> > > >> output:
> >> > > >>>>>>>
> >> > > >>>>>>> 14/02/06 02:44:44 WARN BlockManagerMasterActor: Removing
> >> > > >>> BlockManager
> >> > > >>>>>>> BlockManagerId(0, aash-mbp.dyn.yojoe.local, 57895, 0) with
> no
> >> > > >> recent
> >> > > >>>>>> heart
> >> > > >>>>>>> beats: 57510ms exceeds 45000ms
> >> > > >>>>>>>
> >> > > >>>>>>> Ctrl-C the shell.  In jps there is now a worker, a master,
> and
> >> a
> >> > > >>>>>>> CoarseGrainedExecutorBackend.
> >> > > >>>>>>>
> >> > > >>>>>>> Run jstack on the CGEBackend JVM, and I got the attached
> >> > > >>> stacktraces.
> >> > > >>>> I
> >> > > >>>>>>> waited around for 15min then kill -9'd the JVM and restarted
> >> the
> >> > > >>>> process.
> >> > > >>>>>>>
> >> > > >>>>>>> I wonder if what's happening here is that the threads that
> are
> >> > > >>> spewing
> >> > > >>>>>> data
> >> > > >>>>>>> to disk (as that parallelize and persist would do) can
> write to
> >> > > >> disk
> >> > > >>>>>> faster
> >> > > >>>>>>> than the cleanup threads can delete from disk.
> >> > > >>>>>>>
> >> > > >>>>>>> What do you think of that theory?
> >> > > >>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>> Andrew
> >> > > >>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>> On Thu, Feb 6, 2014 at 2:30 AM, Mridul Muralidharan <
> >> > > >>> mridul@gmail.com
> >> > > >>>>>
> >> > > >>>>>>> wrote:
> >> > > >>>>>>>>
> >> > > >>>>>>>> shutdown hooks should not take 15 mins are you mentioned !
> >> > > >>>>>>>> On the other hand, how busy was your disk when this was
> >> > > >> happening ?
> >> > > >>>>>>>> (either due to spark or something else ?)
> >> > > >>>>>>>>
> >> > > >>>>>>>> It might just be that there was a lot of stuff to remove ?
> >> > > >>>>>>>>
> >> > > >>>>>>>> Regards,
> >> > > >>>>>>>> Mridul
> >> > > >>>>>>>>
> >> > > >>>>>>>>
> >> > > >>>>>>>> On Thu, Feb 6, 2014 at 3:50 PM, Andrew Ash <
> >> > andrew@andrewash.com
> >> > > >>>
> >> > > >>>>>> wrote:
> >> > > >>>>>>>>> Hi Spark devs,
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> Occasionally when hitting Ctrl-C in the scala spark shell
> on
> >> > > >>> 0.9.0
> >> > > >>>> one
> >> > > >>>>>>>>> of
> >> > > >>>>>>>>> my workers goes dead in the spark master UI.  I'm using
> the
> >> > > >>>> standalone
> >> > > >>>>>>>>> cluster and didn't ever see this while using 0.8.0 so I
> think
> >> > > >> it
> >> > > >>>> may
> >> > > >>>>>> be
> >> > > >>>>>>>>> a
> >> > > >>>>>>>>> regression.
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> When I prod on the hung CoarseGrainedExecutorBackend JVM
> with
> >> > > >>>> jstack
> >> > > >>>>>> and
> >> > > >>>>>>>>> jmap -heap, it doesn't respond unless I add the -F force
> >> flag.
> >> > > >>> The
> >> > > >>>>>> heap
> >> > > >>>>>>>>> isn't full, but there are some interesting bits in the
> >> jstack.
> >> > > >>>> Poking
> >> > > >>>>>>>>> around a little, I think there may be some kind of
> deadlock
> >> in
> >> > > >>> the
> >> > > >>>>>>>>> shutdown
> >> > > >>>>>>>>> hooks.
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> Below are the threads I think are most interesting:
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> Thread 14308: (state = BLOCKED)
> >> > > >>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212
> (Interpreted
> >> > > >>>> frame)
> >> > > >>>>>>>>> - java.lang.Runtime.exit(int) @bci=14, line=109
> (Interpreted
> >> > > >>>> frame)
> >> > > >>>>>>>>> - java.lang.System.exit(int) @bci=4, line=962 (Interpreted
> >> > > >>> frame)
> >> > > >>>>>>>>> -
> >> > > >>>>>>>>>
> >> > > >>>>>>>>>
> >> > > >>>>>>
> >> > > >>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object,
> >> > > >>>>>>>>> scala.Function1) @bci=352, line=81 (Interpreted frame)
> >> > > >>>>>>>>> - akka.actor.ActorCell.receiveMessage(java.lang.Object)
> >> > > >> @bci=25,
> >> > > >>>>>>>>> line=498
> >> > > >>>>>>>>> (Interpreted frame)
> >> > > >>>>>>>>> - akka.actor.ActorCell.invoke(akka.dispatch.Envelope)
> >> @bci=39,
> >> > > >>>>>> line=456
> >> > > >>>>>>>>> (Interpreted frame)
> >> > > >>>>>>>>> - akka.dispatch.Mailbox.processMailbox(int, long) @bci=24,
> >> > > >>>> line=237
> >> > > >>>>>>>>> (Interpreted frame)
> >> > > >>>>>>>>> - akka.dispatch.Mailbox.run() @bci=20, line=219
> (Interpreted
> >> > > >>>> frame)
> >> > > >>>>>>>>> -
> >> > > >>>>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec()
> >> > > >>>>>>>>> @bci=4, line=386 (Interpreted frame)
> >> > > >>>>>>>>> - scala.concurrent.forkjoin.ForkJoinTask.doExec() @bci=10,
> >> > > >>>> line=260
> >> > > >>>>>>>>> (Compiled frame)
> >> > > >>>>>>>>> -
> >> > > >>>>>>>>>
> >> > > >>>>>>>>>
> >> > > >>>>>>
> >> > > >>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask)
> >> > > >>>>>>>>> @bci=10, line=1339 (Compiled frame)
> >> > > >>>>>>>>> -
> >> > > >>>>>>>>>
> >> > > >>>>>>>>>
> >> > > >>>>>>
> >> > > >>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue)
> >> > > >>>>>>>>> @bci=11, line=1979 (Compiled frame)
> >> > > >>>>>>>>> - scala.concurrent.forkjoin.ForkJoinWorkerThread.run()
> >> > > >> @bci=14,
> >> > > >>>>>>>>> line=107
> >> > > >>>>>>>>> (Interpreted frame)
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> Thread 3865: (state = BLOCKED)
> >> > > >>>>>>>>> - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
> >> > > >>>>>>>>> - java.lang.Thread.join(long) @bci=38, line=1280
> (Interpreted
> >> > > >>>> frame)
> >> > > >>>>>>>>> - java.lang.Thread.join() @bci=2, line=1354 (Interpreted
> >> > > >> frame)
> >> > > >>>>>>>>> - java.lang.ApplicationShutdownHooks.runHooks() @bci=87,
> >> > > >>> line=106
> >> > > >>>>>>>>> (Interpreted frame)
> >> > > >>>>>>>>> - java.lang.ApplicationShutdownHooks$1.run() @bci=0,
> line=46
> >> > > >>>>>>>>> (Interpreted
> >> > > >>>>>>>>> frame)
> >> > > >>>>>>>>> - java.lang.Shutdown.runHooks() @bci=39, line=123
> >> (Interpreted
> >> > > >>>> frame)
> >> > > >>>>>>>>> - java.lang.Shutdown.sequence() @bci=26, line=167
> >> (Interpreted
> >> > > >>>> frame)
> >> > > >>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212
> (Interpreted
> >> > > >>>> frame)
> >> > > >>>>>>>>> - java.lang.Terminator$1.handle(sun.misc.Signal) @bci=8,
> >> > > >> line=52
> >> > > >>>>>>>>> (Interpreted frame)
> >> > > >>>>>>>>> - sun.misc.Signal$1.run() @bci=8, line=212 (Interpreted
> >> frame)
> >> > > >>>>>>>>> - java.lang.Thread.run() @bci=11, line=744 (Interpreted
> >> frame)
> >> > > >>>>>>>>>
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> Thread 3987: (state = BLOCKED)
> >> > > >>>>>>>>> - java.io.UnixFileSystem.list(java.io.File) @bci=0
> >> > > >> (Interpreted
> >> > > >>>>>> frame)
> >> > > >>>>>>>>> - java.io.File.list() @bci=29, line=1116 (Interpreted
> frame)
> >> > > >>>>>>>>> - java.io.File.listFiles() @bci=1, line=1201 (Compiled
> frame)
> >> > > >>>>>>>>> -
> org.apache.spark.util.Utils$.listFilesSafely(java.io.File)
> >> > > >>>> @bci=1,
> >> > > >>>>>>>>> line=466 (Interpreted frame)
> >> > > >>>>>>>>> -
> >> org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
> >> > > >>>>>> @bci=9,
> >> > > >>>>>>>>> line=478 (Compiled frame)
> >> > > >>>>>>>>> -
> >> > > >>>>>>>>>
> >> > > >>>>>>>>>
> >> > > >>>>>>
> >> > > >>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File)
> >> > > >>>>>>>>> @bci=4, line=479 (Compiled frame)
> >> > > >>>>>>>>> -
> >> > > >>>>>>>>>
> >> > > >>>>>>>>>
> >> > > >>>>>>
> >> > > >>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object)
> >> > > >>>>>>>>> @bci=5, line=478 (Compiled frame)
> >> > > >>>>>>>>> -
> >> > > >>>>>>>>>
> >> > > >>>>>>>>>
> >> > > >>>>>>
> >> > > >>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> >> > > >>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
> >> > > >>>>>>>>> -
> >> > > >> scala.collection.mutable.WrappedArray.foreach(scala.Function1)
> >> > > >>>>>>>>> @bci=2,
> >> > > >>>>>>>>> line=34 (Compiled frame)
> >> > > >>>>>>>>> -
> >> org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
> >> > > >>>>>> @bci=19,
> >> > > >>>>>>>>> line=478 (Interpreted frame)
> >> > > >>>>>>>>> -
> >> > > >>>>>>>>>
> >> > > >>>>>>>>>
> >> > > >>>>>>
> >> > > >>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File)
> >> > > >>>>>>>>> @bci=14, line=141 (Interpreted frame)
> >> > > >>>>>>>>> -
> >> > > >>>>>>>>>
> >> > > >>>>>>>>>
> >> > > >>>>>>
> >> > > >>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object)
> >> > > >>>>>>>>> @bci=5, line=139 (Interpreted frame)
> >> > > >>>>>>>>> -
> >> > > >>>>>>>>>
> >> > > >>>>>>>>>
> >> > > >>>>>>
> >> > > >>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> >> > > >>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
> >> > > >>>>>>>>> -
> >> > > >>> scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
> >> > > >>>>>>>>> @bci=2,
> >> > > >>>>>>>>> line=108 (Interpreted frame)
> >> > > >>>>>>>>> - org.apache.spark.storage.DiskBlockManager$$anon$1.run()
> >> > > >>> @bci=39,
> >> > > >>>>>>>>> line=139 (Interpreted frame)
> >> > > >>>>>>>>>
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> I think what happened here is that thread 14308 received
> the
> >> > > >> akka
> >> > > >>>>>>>>> "shutdown" message and called System.exit().  This started
> >> > > >> thread
> >> > > >>>>>> 3865,
> >> > > >>>>>>>>> which is the JVM shutting itself down.  Part of that
> process
> >> is
> >> > > >>>>>> running
> >> > > >>>>>>>>> the
> >> > > >>>>>>>>> shutdown hooks, so it started thread 3987.  That thread is
> >> the
> >> > > >>>>>> shutdown
> >> > > >>>>>>>>> hook from addShutdownHook() in DiskBlockManager.scala,
> which
> >> > > >>> looks
> >> > > >>>>>> like
> >> > > >>>>>>>>> this:
> >> > > >>>>>>>>>
> >> > > >>>>>>>>>  private def addShutdownHook() {
> >> > > >>>>>>>>>    localDirs.foreach(localDir =>
> >> > > >>>>>>>>> Utils.registerShutdownDeleteDir(localDir))
> >> > > >>>>>>>>>    Runtime.getRuntime.addShutdownHook(new Thread("delete
> >> Spark
> >> > > >>>> local
> >> > > >>>>>>>>> dirs") {
> >> > > >>>>>>>>>      override def run() {
> >> > > >>>>>>>>>        logDebug("Shutdown hook called")
> >> > > >>>>>>>>>        localDirs.foreach { localDir =>
> >> > > >>>>>>>>>          try {
> >> > > >>>>>>>>>            if
> (!Utils.hasRootAsShutdownDeleteDir(localDir))
> >> > > >>>>>>>>> Utils.deleteRecursively(localDir)
> >> > > >>>>>>>>>          } catch {
> >> > > >>>>>>>>>            case t: Throwable =>
> >> > > >>>>>>>>>              logError("Exception while deleting local
> spark
> >> > > >> dir:
> >> > > >>>> " +
> >> > > >>>>>>>>> localDir, t)
> >> > > >>>>>>>>>          }
> >> > > >>>>>>>>>        }
> >> > > >>>>>>>>>
> >> > > >>>>>>>>>        if (shuffleSender != null) {
> >> > > >>>>>>>>>          shuffleSender.stop()
> >> > > >>>>>>>>>        }
> >> > > >>>>>>>>>      }
> >> > > >>>>>>>>>    })
> >> > > >>>>>>>>>  }
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> It goes through and deletes the directories recursively.
>  I
> >> was
> >> > > >>>>>> thinking
> >> > > >>>>>>>>> there might be some issues with concurrently-running
> shutdown
> >> > > >>> hooks
> >> > > >>>>>>>>> deleting things out from underneath each other (shutdown
> hook
> >> > > >>>> javadocs
> >> > > >>>>>>>>> say
> >> > > >>>>>>>>> they're all started in parallel if multiple hooks are
> added)
> >> > > >>>> causing
> >> > > >>>>>> the
> >> > > >>>>>>>>> File.list() in that last thread to take quite some time.
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> While I was looking through the stacktrace the JVM finally
> >> > > >> exited
> >> > > >>>>>> (after
> >> > > >>>>>>>>> 15-20min at least) so I won't be able to debug more until
> >> this
> >> > > >>> bug
> >> > > >>>>>>>>> strikes
> >> > > >>>>>>>>> again.
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> Any ideas on what might be going on here?
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> Thanks!
> >> > > >>>>>>>>> Andrew
> >> > > >>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>
> >> > > >>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> > >
> >> >
> >>
>

Re: [0.9.0] Possible deadlock in shutdown hook?

Posted by Mridul Muralidharan <mr...@gmail.com>.
This looks like the most reasonable approach to resolve this !

Regards,
Mridul


On Fri, Feb 7, 2014 at 1:43 PM, Tathagata Das
<ta...@gmail.com> wrote:
> Or we can try adding a shutdown hook in the
> Executor<https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala?source=c#L127>to
> call threadPool.shutdownNow(). May have to catch the
> InterruptedException and handle it gracefully out
> here<https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala?source=c#L255>
> .
>
> TD
>
>
> On Thu, Feb 6, 2014 at 11:49 PM, Andrew Ash <an...@andrewash.com> wrote:
>
>> I think we can enumerate all current threads with the ThreadMXBean, filter
>> to those threads with the name of executor pool in them, and interrupt
>> them.
>>
>>
>> http://docs.oracle.com/javase/6/docs/api/java/lang/management/ManagementFactory.html#getThreadMXBean%28%29
>>
>> The executor threads are currently named according to the pattern "Executor
>> task launch worker-X"
>>
>>
>> On Thu, Feb 6, 2014 at 11:45 PM, Tathagata Das
>> <ta...@gmail.com>wrote:
>>
>> > That definitely sound more reliable. Worth trying out if there is a
>> > reliable way of reproducing the deadlock-like scenario.
>> >
>> > TD
>> >
>> >
>> > On Thu, Feb 6, 2014 at 11:38 PM, Matei Zaharia <matei.zaharia@gmail.com
>> > >wrote:
>> >
>> > > I don't think we necessarily want to do this through the DAGScheduler
>> > > because the worker might also shut down due to some unusual termination
>> > > condition, like the driver node crashing. Can't we do it at the top of
>> > the
>> > > shutdown hook instead? If all the threads are in the same thread pool
>> it
>> > > might be possible to interrupt or stop the whole pool.
>> > >
>> > > Matei
>> > >
>> > > On Feb 6, 2014, at 11:30 PM, Andrew Ash <an...@andrewash.com> wrote:
>> > >
>> > > > That's genius.  Of course when a worker is told to shutdown it should
>> > > > interrupt its worker threads -- I think that would address this
>> issue.
>> > > >
>> > > > Are you thinking to put
>> > > >
>> > > > running.map(_.jobId).foreach { handleJobCancellation }
>> > > >
>> > > > at the top of the StopDAGScheduler block?
>> > > >
>> > > >
>> > > > On Thu, Feb 6, 2014 at 11:05 PM, Tathagata Das
>> > > > <ta...@gmail.com>wrote:
>> > > >
>> > > >> Its highly likely that the executor with the threadpool that runs
>> the
>> > > tasks
>> > > >> are the only set of threads that writes to disk. The tasks are
>> > designed
>> > > to
>> > > >> be interrupted when the corresponding job is cancelled. So a
>> > reasonably
>> > > >> simple way could be to actually cancel the currently active jobs,
>> > which
>> > > >> would send the signal to the worker to stop the tasks. Currently,
>> the
>> > > >> DAGScheduler<
>> > > >>
>> > >
>> >
>> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L610
>> > > >>> does
>> > > >> not seem to actually cancel the jobs, only mark them as failed. So
>> it
>> > > >> may be a simple addition.
>> > > >>
>> > > >> There may be some complications with the external spilling of
>> shuffle
>> > > data
>> > > >> to disk not stopping immediately when the task is marked for
>> killing.
>> > > Gotta
>> > > >> try it out.
>> > > >>
>> > > >> TD
>> > > >>
>> > > >> On Thu, Feb 6, 2014 at 10:39 PM, Andrew Ash <an...@andrewash.com>
>> > > wrote:
>> > > >>
>> > > >>> There is probably just one threadpool that has task threads -- is
>> it
>> > > >>> possible to enumerate and interrupt just those?  We may need to
>> keep
>> > > >> string
>> > > >>> a reference to that threadpool through to the shutdown thread to
>> make
>> > > >> that
>> > > >>> happen.
>> > > >>>
>> > > >>>
>> > > >>> On Thu, Feb 6, 2014 at 10:36 PM, Mridul Muralidharan <
>> > mridul@gmail.com
>> > > >>>> wrote:
>> > > >>>
>> > > >>>> Ideally, interrupting the thread writing to disk should be
>> > sufficient
>> > > >>>> - though since we are in middle of shutdown when this is
>> happening,
>> > it
>> > > >>>> is best case effort anyway.
>> > > >>>> Identifying which threads to interrupt will be interesting since
>> > most
>> > > >>>> of them are driven by threadpool's and we cant list all threads
>> and
>> > > >>>> interrupt all of them !
>> > > >>>>
>> > > >>>>
>> > > >>>> Regards,
>> > > >>>> Mridul
>> > > >>>>
>> > > >>>>
>> > > >>>> On Fri, Feb 7, 2014 at 5:57 AM, Andrew Ash <an...@andrewash.com>
>> > > >> wrote:
>> > > >>>>> I think the solution where we stop the writing threads and then
>> let
>> > > >> the
>> > > >>>>> deleting threads completely clean up is the best option since the
>> > > >> final
>> > > >>>>> state doesn't have half-deleted temp dirs scattered across the
>> > > >> cluster.
>> > > >>>>>
>> > > >>>>> How feasible do you think it'd be to interrupt the other threads?
>> > > >>>>>
>> > > >>>>>
>> > > >>>>> On Thu, Feb 6, 2014 at 10:54 AM, Mridul Muralidharan <
>> > > >> mridul@gmail.com
>> > > >>>>> wrote:
>> > > >>>>>
>> > > >>>>>> Looks like a pathological corner case here - where the the
>> delete
>> > > >>>>>> thread is not getting run while the OS is busy prioritizing the
>> > > >> thread
>> > > >>>>>> writing data (probably with heavy gc too).
>> > > >>>>>> Ideally, the delete thread would list files, remove them and
>> then
>> > > >> fail
>> > > >>>>>> when it tries to remove the non empty directory (since other
>> > thread
>> > > >>>>>> might be creating more in parallel).
>> > > >>>>>>
>> > > >>>>>>
>> > > >>>>>> Regards,
>> > > >>>>>> Mridul
>> > > >>>>>>
>> > > >>>>>>
>> > > >>>>>> On Thu, Feb 6, 2014 at 4:19 PM, Andrew Ash <
>> andrew@andrewash.com>
>> > > >>>> wrote:
>> > > >>>>>>> Got a repro locally on my MBP (the other was on a CentOS
>> > machine).
>> > > >>>>>>>
>> > > >>>>>>> Build spark, run a master and a worker with the
>> sbin/start-all.sh
>> > > >>>> script,
>> > > >>>>>>> then run this in a shell:
>> > > >>>>>>>
>> > > >>>>>>> import org.apache.spark.storage.StorageLevel._
>> > > >>>>>>> val s = sc.parallelize(1 to
>> > > >>> 1000000000).persist(MEMORY_AND_DISK_SER);
>> > > >>>>>>> s.count
>> > > >>>>>>>
>> > > >>>>>>> After about a minute, this line appears in the shell logging
>> > > >> output:
>> > > >>>>>>>
>> > > >>>>>>> 14/02/06 02:44:44 WARN BlockManagerMasterActor: Removing
>> > > >>> BlockManager
>> > > >>>>>>> BlockManagerId(0, aash-mbp.dyn.yojoe.local, 57895, 0) with no
>> > > >> recent
>> > > >>>>>> heart
>> > > >>>>>>> beats: 57510ms exceeds 45000ms
>> > > >>>>>>>
>> > > >>>>>>> Ctrl-C the shell.  In jps there is now a worker, a master, and
>> a
>> > > >>>>>>> CoarseGrainedExecutorBackend.
>> > > >>>>>>>
>> > > >>>>>>> Run jstack on the CGEBackend JVM, and I got the attached
>> > > >>> stacktraces.
>> > > >>>> I
>> > > >>>>>>> waited around for 15min then kill -9'd the JVM and restarted
>> the
>> > > >>>> process.
>> > > >>>>>>>
>> > > >>>>>>> I wonder if what's happening here is that the threads that are
>> > > >>> spewing
>> > > >>>>>> data
>> > > >>>>>>> to disk (as that parallelize and persist would do) can write to
>> > > >> disk
>> > > >>>>>> faster
>> > > >>>>>>> than the cleanup threads can delete from disk.
>> > > >>>>>>>
>> > > >>>>>>> What do you think of that theory?
>> > > >>>>>>>
>> > > >>>>>>>
>> > > >>>>>>> Andrew
>> > > >>>>>>>
>> > > >>>>>>>
>> > > >>>>>>>
>> > > >>>>>>> On Thu, Feb 6, 2014 at 2:30 AM, Mridul Muralidharan <
>> > > >>> mridul@gmail.com
>> > > >>>>>
>> > > >>>>>>> wrote:
>> > > >>>>>>>>
>> > > >>>>>>>> shutdown hooks should not take 15 mins are you mentioned !
>> > > >>>>>>>> On the other hand, how busy was your disk when this was
>> > > >> happening ?
>> > > >>>>>>>> (either due to spark or something else ?)
>> > > >>>>>>>>
>> > > >>>>>>>> It might just be that there was a lot of stuff to remove ?
>> > > >>>>>>>>
>> > > >>>>>>>> Regards,
>> > > >>>>>>>> Mridul
>> > > >>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>> On Thu, Feb 6, 2014 at 3:50 PM, Andrew Ash <
>> > andrew@andrewash.com
>> > > >>>
>> > > >>>>>> wrote:
>> > > >>>>>>>>> Hi Spark devs,
>> > > >>>>>>>>>
>> > > >>>>>>>>> Occasionally when hitting Ctrl-C in the scala spark shell on
>> > > >>> 0.9.0
>> > > >>>> one
>> > > >>>>>>>>> of
>> > > >>>>>>>>> my workers goes dead in the spark master UI.  I'm using the
>> > > >>>> standalone
>> > > >>>>>>>>> cluster and didn't ever see this while using 0.8.0 so I think
>> > > >> it
>> > > >>>> may
>> > > >>>>>> be
>> > > >>>>>>>>> a
>> > > >>>>>>>>> regression.
>> > > >>>>>>>>>
>> > > >>>>>>>>> When I prod on the hung CoarseGrainedExecutorBackend JVM with
>> > > >>>> jstack
>> > > >>>>>> and
>> > > >>>>>>>>> jmap -heap, it doesn't respond unless I add the -F force
>> flag.
>> > > >>> The
>> > > >>>>>> heap
>> > > >>>>>>>>> isn't full, but there are some interesting bits in the
>> jstack.
>> > > >>>> Poking
>> > > >>>>>>>>> around a little, I think there may be some kind of deadlock
>> in
>> > > >>> the
>> > > >>>>>>>>> shutdown
>> > > >>>>>>>>> hooks.
>> > > >>>>>>>>>
>> > > >>>>>>>>> Below are the threads I think are most interesting:
>> > > >>>>>>>>>
>> > > >>>>>>>>> Thread 14308: (state = BLOCKED)
>> > > >>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted
>> > > >>>> frame)
>> > > >>>>>>>>> - java.lang.Runtime.exit(int) @bci=14, line=109 (Interpreted
>> > > >>>> frame)
>> > > >>>>>>>>> - java.lang.System.exit(int) @bci=4, line=962 (Interpreted
>> > > >>> frame)
>> > > >>>>>>>>> -
>> > > >>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>
>> > > >>>>
>> > > >>>
>> > > >>
>> > >
>> >
>> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object,
>> > > >>>>>>>>> scala.Function1) @bci=352, line=81 (Interpreted frame)
>> > > >>>>>>>>> - akka.actor.ActorCell.receiveMessage(java.lang.Object)
>> > > >> @bci=25,
>> > > >>>>>>>>> line=498
>> > > >>>>>>>>> (Interpreted frame)
>> > > >>>>>>>>> - akka.actor.ActorCell.invoke(akka.dispatch.Envelope)
>> @bci=39,
>> > > >>>>>> line=456
>> > > >>>>>>>>> (Interpreted frame)
>> > > >>>>>>>>> - akka.dispatch.Mailbox.processMailbox(int, long) @bci=24,
>> > > >>>> line=237
>> > > >>>>>>>>> (Interpreted frame)
>> > > >>>>>>>>> - akka.dispatch.Mailbox.run() @bci=20, line=219 (Interpreted
>> > > >>>> frame)
>> > > >>>>>>>>> -
>> > > >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec()
>> > > >>>>>>>>> @bci=4, line=386 (Interpreted frame)
>> > > >>>>>>>>> - scala.concurrent.forkjoin.ForkJoinTask.doExec() @bci=10,
>> > > >>>> line=260
>> > > >>>>>>>>> (Compiled frame)
>> > > >>>>>>>>> -
>> > > >>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>
>> > > >>>>
>> > > >>>
>> > > >>
>> > >
>> >
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask)
>> > > >>>>>>>>> @bci=10, line=1339 (Compiled frame)
>> > > >>>>>>>>> -
>> > > >>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>
>> > > >>>>
>> > > >>>
>> > > >>
>> > >
>> >
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue)
>> > > >>>>>>>>> @bci=11, line=1979 (Compiled frame)
>> > > >>>>>>>>> - scala.concurrent.forkjoin.ForkJoinWorkerThread.run()
>> > > >> @bci=14,
>> > > >>>>>>>>> line=107
>> > > >>>>>>>>> (Interpreted frame)
>> > > >>>>>>>>>
>> > > >>>>>>>>> Thread 3865: (state = BLOCKED)
>> > > >>>>>>>>> - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
>> > > >>>>>>>>> - java.lang.Thread.join(long) @bci=38, line=1280 (Interpreted
>> > > >>>> frame)
>> > > >>>>>>>>> - java.lang.Thread.join() @bci=2, line=1354 (Interpreted
>> > > >> frame)
>> > > >>>>>>>>> - java.lang.ApplicationShutdownHooks.runHooks() @bci=87,
>> > > >>> line=106
>> > > >>>>>>>>> (Interpreted frame)
>> > > >>>>>>>>> - java.lang.ApplicationShutdownHooks$1.run() @bci=0, line=46
>> > > >>>>>>>>> (Interpreted
>> > > >>>>>>>>> frame)
>> > > >>>>>>>>> - java.lang.Shutdown.runHooks() @bci=39, line=123
>> (Interpreted
>> > > >>>> frame)
>> > > >>>>>>>>> - java.lang.Shutdown.sequence() @bci=26, line=167
>> (Interpreted
>> > > >>>> frame)
>> > > >>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted
>> > > >>>> frame)
>> > > >>>>>>>>> - java.lang.Terminator$1.handle(sun.misc.Signal) @bci=8,
>> > > >> line=52
>> > > >>>>>>>>> (Interpreted frame)
>> > > >>>>>>>>> - sun.misc.Signal$1.run() @bci=8, line=212 (Interpreted
>> frame)
>> > > >>>>>>>>> - java.lang.Thread.run() @bci=11, line=744 (Interpreted
>> frame)
>> > > >>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>>> Thread 3987: (state = BLOCKED)
>> > > >>>>>>>>> - java.io.UnixFileSystem.list(java.io.File) @bci=0
>> > > >> (Interpreted
>> > > >>>>>> frame)
>> > > >>>>>>>>> - java.io.File.list() @bci=29, line=1116 (Interpreted frame)
>> > > >>>>>>>>> - java.io.File.listFiles() @bci=1, line=1201 (Compiled frame)
>> > > >>>>>>>>> - org.apache.spark.util.Utils$.listFilesSafely(java.io.File)
>> > > >>>> @bci=1,
>> > > >>>>>>>>> line=466 (Interpreted frame)
>> > > >>>>>>>>> -
>> org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
>> > > >>>>>> @bci=9,
>> > > >>>>>>>>> line=478 (Compiled frame)
>> > > >>>>>>>>> -
>> > > >>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>
>> > > >>>>
>> > > >>>
>> > > >>
>> > >
>> >
>> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File)
>> > > >>>>>>>>> @bci=4, line=479 (Compiled frame)
>> > > >>>>>>>>> -
>> > > >>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>
>> > > >>>>
>> > > >>>
>> > > >>
>> > >
>> >
>> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object)
>> > > >>>>>>>>> @bci=5, line=478 (Compiled frame)
>> > > >>>>>>>>> -
>> > > >>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>
>> > > >>>>
>> > > >>>
>> > > >>
>> > >
>> >
>> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>> > > >>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
>> > > >>>>>>>>> -
>> > > >> scala.collection.mutable.WrappedArray.foreach(scala.Function1)
>> > > >>>>>>>>> @bci=2,
>> > > >>>>>>>>> line=34 (Compiled frame)
>> > > >>>>>>>>> -
>> org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
>> > > >>>>>> @bci=19,
>> > > >>>>>>>>> line=478 (Interpreted frame)
>> > > >>>>>>>>> -
>> > > >>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>
>> > > >>>>
>> > > >>>
>> > > >>
>> > >
>> >
>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File)
>> > > >>>>>>>>> @bci=14, line=141 (Interpreted frame)
>> > > >>>>>>>>> -
>> > > >>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>
>> > > >>>>
>> > > >>>
>> > > >>
>> > >
>> >
>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object)
>> > > >>>>>>>>> @bci=5, line=139 (Interpreted frame)
>> > > >>>>>>>>> -
>> > > >>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>
>> > > >>>>
>> > > >>>
>> > > >>
>> > >
>> >
>> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>> > > >>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
>> > > >>>>>>>>> -
>> > > >>> scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
>> > > >>>>>>>>> @bci=2,
>> > > >>>>>>>>> line=108 (Interpreted frame)
>> > > >>>>>>>>> - org.apache.spark.storage.DiskBlockManager$$anon$1.run()
>> > > >>> @bci=39,
>> > > >>>>>>>>> line=139 (Interpreted frame)
>> > > >>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>>> I think what happened here is that thread 14308 received the
>> > > >> akka
>> > > >>>>>>>>> "shutdown" message and called System.exit().  This started
>> > > >> thread
>> > > >>>>>> 3865,
>> > > >>>>>>>>> which is the JVM shutting itself down.  Part of that process
>> is
>> > > >>>>>> running
>> > > >>>>>>>>> the
>> > > >>>>>>>>> shutdown hooks, so it started thread 3987.  That thread is
>> the
>> > > >>>>>> shutdown
>> > > >>>>>>>>> hook from addShutdownHook() in DiskBlockManager.scala, which
>> > > >>> looks
>> > > >>>>>> like
>> > > >>>>>>>>> this:
>> > > >>>>>>>>>
>> > > >>>>>>>>>  private def addShutdownHook() {
>> > > >>>>>>>>>    localDirs.foreach(localDir =>
>> > > >>>>>>>>> Utils.registerShutdownDeleteDir(localDir))
>> > > >>>>>>>>>    Runtime.getRuntime.addShutdownHook(new Thread("delete
>> Spark
>> > > >>>> local
>> > > >>>>>>>>> dirs") {
>> > > >>>>>>>>>      override def run() {
>> > > >>>>>>>>>        logDebug("Shutdown hook called")
>> > > >>>>>>>>>        localDirs.foreach { localDir =>
>> > > >>>>>>>>>          try {
>> > > >>>>>>>>>            if (!Utils.hasRootAsShutdownDeleteDir(localDir))
>> > > >>>>>>>>> Utils.deleteRecursively(localDir)
>> > > >>>>>>>>>          } catch {
>> > > >>>>>>>>>            case t: Throwable =>
>> > > >>>>>>>>>              logError("Exception while deleting local spark
>> > > >> dir:
>> > > >>>> " +
>> > > >>>>>>>>> localDir, t)
>> > > >>>>>>>>>          }
>> > > >>>>>>>>>        }
>> > > >>>>>>>>>
>> > > >>>>>>>>>        if (shuffleSender != null) {
>> > > >>>>>>>>>          shuffleSender.stop()
>> > > >>>>>>>>>        }
>> > > >>>>>>>>>      }
>> > > >>>>>>>>>    })
>> > > >>>>>>>>>  }
>> > > >>>>>>>>>
>> > > >>>>>>>>> It goes through and deletes the directories recursively.  I
>> was
>> > > >>>>>> thinking
>> > > >>>>>>>>> there might be some issues with concurrently-running shutdown
>> > > >>> hooks
>> > > >>>>>>>>> deleting things out from underneath each other (shutdown hook
>> > > >>>> javadocs
>> > > >>>>>>>>> say
>> > > >>>>>>>>> they're all started in parallel if multiple hooks are added)
>> > > >>>> causing
>> > > >>>>>> the
>> > > >>>>>>>>> File.list() in that last thread to take quite some time.
>> > > >>>>>>>>>
>> > > >>>>>>>>> While I was looking through the stacktrace the JVM finally
>> > > >> exited
>> > > >>>>>> (after
>> > > >>>>>>>>> 15-20min at least) so I won't be able to debug more until
>> this
>> > > >>> bug
>> > > >>>>>>>>> strikes
>> > > >>>>>>>>> again.
>> > > >>>>>>>>>
>> > > >>>>>>>>> Any ideas on what might be going on here?
>> > > >>>>>>>>>
>> > > >>>>>>>>> Thanks!
>> > > >>>>>>>>> Andrew
>> > > >>>>>>>
>> > > >>>>>>>
>> > > >>>>>>
>> > > >>>>
>> > > >>>
>> > > >>
>> > >
>> > >
>> >
>>

Re: [0.9.0] Possible deadlock in shutdown hook?

Posted by Tathagata Das <ta...@gmail.com>.
I think if the threads in the threadpool catch and ignore
InterruptedException then those thread cant be stopped. So there is not
guarantee, but it will probably most of the time. Unless some user code
catches Interrupted exception.
We can probably first try to
kill<https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala?source=c#L255>
the
currently running
tasks<https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala?source=c#L130>,
which will make an attempt to "gracefully" shut them down. That probably
cannot be overriden by user code. Then use the threadpool.shutdownNow.
Double whammy!

TD


On Fri, Feb 7, 2014 at 12:21 AM, Andrew Ash <an...@andrewash.com> wrote:

> An additional shutdown hook to stop the threadpool is much more elegant
> than the name matching and thread interrupting I was thinking about.  That
> Javadoc looks like it's a best-effort shutdown and won't hard kill threads,
> but that's at least a step forward from current behavior.
>
>
> http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html#shutdownNow()
>
>
> On Fri, Feb 7, 2014 at 12:13 AM, Tathagata Das
> <ta...@gmail.com>wrote:
>
> > Or we can try adding a shutdown hook in the
> > Executor<
> >
> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala?source=c#L127
> > >to
> > call threadPool.shutdownNow(). May have to catch the
> > InterruptedException and handle it gracefully out
> > here<
> >
> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala?source=c#L255
> > >
> > .
> >
> > TD
> >
> >
> > On Thu, Feb 6, 2014 at 11:49 PM, Andrew Ash <an...@andrewash.com>
> wrote:
> >
> > > I think we can enumerate all current threads with the ThreadMXBean,
> > filter
> > > to those threads with the name of executor pool in them, and interrupt
> > > them.
> > >
> > >
> > >
> >
> http://docs.oracle.com/javase/6/docs/api/java/lang/management/ManagementFactory.html#getThreadMXBean%28%29
> > >
> > > The executor threads are currently named according to the pattern
> > "Executor
> > > task launch worker-X"
> > >
> > >
> > > On Thu, Feb 6, 2014 at 11:45 PM, Tathagata Das
> > > <ta...@gmail.com>wrote:
> > >
> > > > That definitely sound more reliable. Worth trying out if there is a
> > > > reliable way of reproducing the deadlock-like scenario.
> > > >
> > > > TD
> > > >
> > > >
> > > > On Thu, Feb 6, 2014 at 11:38 PM, Matei Zaharia <
> > matei.zaharia@gmail.com
> > > > >wrote:
> > > >
> > > > > I don't think we necessarily want to do this through the
> DAGScheduler
> > > > > because the worker might also shut down due to some unusual
> > termination
> > > > > condition, like the driver node crashing. Can't we do it at the top
> > of
> > > > the
> > > > > shutdown hook instead? If all the threads are in the same thread
> pool
> > > it
> > > > > might be possible to interrupt or stop the whole pool.
> > > > >
> > > > > Matei
> > > > >
> > > > > On Feb 6, 2014, at 11:30 PM, Andrew Ash <an...@andrewash.com>
> > wrote:
> > > > >
> > > > > > That's genius.  Of course when a worker is told to shutdown it
> > should
> > > > > > interrupt its worker threads -- I think that would address this
> > > issue.
> > > > > >
> > > > > > Are you thinking to put
> > > > > >
> > > > > > running.map(_.jobId).foreach { handleJobCancellation }
> > > > > >
> > > > > > at the top of the StopDAGScheduler block?
> > > > > >
> > > > > >
> > > > > > On Thu, Feb 6, 2014 at 11:05 PM, Tathagata Das
> > > > > > <ta...@gmail.com>wrote:
> > > > > >
> > > > > >> Its highly likely that the executor with the threadpool that
> runs
> > > the
> > > > > tasks
> > > > > >> are the only set of threads that writes to disk. The tasks are
> > > > designed
> > > > > to
> > > > > >> be interrupted when the corresponding job is cancelled. So a
> > > > reasonably
> > > > > >> simple way could be to actually cancel the currently active
> jobs,
> > > > which
> > > > > >> would send the signal to the worker to stop the tasks.
> Currently,
> > > the
> > > > > >> DAGScheduler<
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L610
> > > > > >>> does
> > > > > >> not seem to actually cancel the jobs, only mark them as failed.
> So
> > > it
> > > > > >> may be a simple addition.
> > > > > >>
> > > > > >> There may be some complications with the external spilling of
> > > shuffle
> > > > > data
> > > > > >> to disk not stopping immediately when the task is marked for
> > > killing.
> > > > > Gotta
> > > > > >> try it out.
> > > > > >>
> > > > > >> TD
> > > > > >>
> > > > > >> On Thu, Feb 6, 2014 at 10:39 PM, Andrew Ash <
> andrew@andrewash.com
> > >
> > > > > wrote:
> > > > > >>
> > > > > >>> There is probably just one threadpool that has task threads --
> is
> > > it
> > > > > >>> possible to enumerate and interrupt just those?  We may need to
> > > keep
> > > > > >> string
> > > > > >>> a reference to that threadpool through to the shutdown thread
> to
> > > make
> > > > > >> that
> > > > > >>> happen.
> > > > > >>>
> > > > > >>>
> > > > > >>> On Thu, Feb 6, 2014 at 10:36 PM, Mridul Muralidharan <
> > > > mridul@gmail.com
> > > > > >>>> wrote:
> > > > > >>>
> > > > > >>>> Ideally, interrupting the thread writing to disk should be
> > > > sufficient
> > > > > >>>> - though since we are in middle of shutdown when this is
> > > happening,
> > > > it
> > > > > >>>> is best case effort anyway.
> > > > > >>>> Identifying which threads to interrupt will be interesting
> since
> > > > most
> > > > > >>>> of them are driven by threadpool's and we cant list all
> threads
> > > and
> > > > > >>>> interrupt all of them !
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> Regards,
> > > > > >>>> Mridul
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> On Fri, Feb 7, 2014 at 5:57 AM, Andrew Ash <
> > andrew@andrewash.com>
> > > > > >> wrote:
> > > > > >>>>> I think the solution where we stop the writing threads and
> then
> > > let
> > > > > >> the
> > > > > >>>>> deleting threads completely clean up is the best option since
> > the
> > > > > >> final
> > > > > >>>>> state doesn't have half-deleted temp dirs scattered across
> the
> > > > > >> cluster.
> > > > > >>>>>
> > > > > >>>>> How feasible do you think it'd be to interrupt the other
> > threads?
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> On Thu, Feb 6, 2014 at 10:54 AM, Mridul Muralidharan <
> > > > > >> mridul@gmail.com
> > > > > >>>>> wrote:
> > > > > >>>>>
> > > > > >>>>>> Looks like a pathological corner case here - where the the
> > > delete
> > > > > >>>>>> thread is not getting run while the OS is busy prioritizing
> > the
> > > > > >> thread
> > > > > >>>>>> writing data (probably with heavy gc too).
> > > > > >>>>>> Ideally, the delete thread would list files, remove them and
> > > then
> > > > > >> fail
> > > > > >>>>>> when it tries to remove the non empty directory (since other
> > > > thread
> > > > > >>>>>> might be creating more in parallel).
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> Regards,
> > > > > >>>>>> Mridul
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> On Thu, Feb 6, 2014 at 4:19 PM, Andrew Ash <
> > > andrew@andrewash.com>
> > > > > >>>> wrote:
> > > > > >>>>>>> Got a repro locally on my MBP (the other was on a CentOS
> > > > machine).
> > > > > >>>>>>>
> > > > > >>>>>>> Build spark, run a master and a worker with the
> > > sbin/start-all.sh
> > > > > >>>> script,
> > > > > >>>>>>> then run this in a shell:
> > > > > >>>>>>>
> > > > > >>>>>>> import org.apache.spark.storage.StorageLevel._
> > > > > >>>>>>> val s = sc.parallelize(1 to
> > > > > >>> 1000000000).persist(MEMORY_AND_DISK_SER);
> > > > > >>>>>>> s.count
> > > > > >>>>>>>
> > > > > >>>>>>> After about a minute, this line appears in the shell
> logging
> > > > > >> output:
> > > > > >>>>>>>
> > > > > >>>>>>> 14/02/06 02:44:44 WARN BlockManagerMasterActor: Removing
> > > > > >>> BlockManager
> > > > > >>>>>>> BlockManagerId(0, aash-mbp.dyn.yojoe.local, 57895, 0) with
> no
> > > > > >> recent
> > > > > >>>>>> heart
> > > > > >>>>>>> beats: 57510ms exceeds 45000ms
> > > > > >>>>>>>
> > > > > >>>>>>> Ctrl-C the shell.  In jps there is now a worker, a master,
> > and
> > > a
> > > > > >>>>>>> CoarseGrainedExecutorBackend.
> > > > > >>>>>>>
> > > > > >>>>>>> Run jstack on the CGEBackend JVM, and I got the attached
> > > > > >>> stacktraces.
> > > > > >>>> I
> > > > > >>>>>>> waited around for 15min then kill -9'd the JVM and
> restarted
> > > the
> > > > > >>>> process.
> > > > > >>>>>>>
> > > > > >>>>>>> I wonder if what's happening here is that the threads that
> > are
> > > > > >>> spewing
> > > > > >>>>>> data
> > > > > >>>>>>> to disk (as that parallelize and persist would do) can
> write
> > to
> > > > > >> disk
> > > > > >>>>>> faster
> > > > > >>>>>>> than the cleanup threads can delete from disk.
> > > > > >>>>>>>
> > > > > >>>>>>> What do you think of that theory?
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>> Andrew
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>> On Thu, Feb 6, 2014 at 2:30 AM, Mridul Muralidharan <
> > > > > >>> mridul@gmail.com
> > > > > >>>>>
> > > > > >>>>>>> wrote:
> > > > > >>>>>>>>
> > > > > >>>>>>>> shutdown hooks should not take 15 mins are you mentioned !
> > > > > >>>>>>>> On the other hand, how busy was your disk when this was
> > > > > >> happening ?
> > > > > >>>>>>>> (either due to spark or something else ?)
> > > > > >>>>>>>>
> > > > > >>>>>>>> It might just be that there was a lot of stuff to remove ?
> > > > > >>>>>>>>
> > > > > >>>>>>>> Regards,
> > > > > >>>>>>>> Mridul
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>> On Thu, Feb 6, 2014 at 3:50 PM, Andrew Ash <
> > > > andrew@andrewash.com
> > > > > >>>
> > > > > >>>>>> wrote:
> > > > > >>>>>>>>> Hi Spark devs,
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Occasionally when hitting Ctrl-C in the scala spark shell
> > on
> > > > > >>> 0.9.0
> > > > > >>>> one
> > > > > >>>>>>>>> of
> > > > > >>>>>>>>> my workers goes dead in the spark master UI.  I'm using
> the
> > > > > >>>> standalone
> > > > > >>>>>>>>> cluster and didn't ever see this while using 0.8.0 so I
> > think
> > > > > >> it
> > > > > >>>> may
> > > > > >>>>>> be
> > > > > >>>>>>>>> a
> > > > > >>>>>>>>> regression.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> When I prod on the hung CoarseGrainedExecutorBackend JVM
> > with
> > > > > >>>> jstack
> > > > > >>>>>> and
> > > > > >>>>>>>>> jmap -heap, it doesn't respond unless I add the -F force
> > > flag.
> > > > > >>> The
> > > > > >>>>>> heap
> > > > > >>>>>>>>> isn't full, but there are some interesting bits in the
> > > jstack.
> > > > > >>>> Poking
> > > > > >>>>>>>>> around a little, I think there may be some kind of
> deadlock
> > > in
> > > > > >>> the
> > > > > >>>>>>>>> shutdown
> > > > > >>>>>>>>> hooks.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Below are the threads I think are most interesting:
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Thread 14308: (state = BLOCKED)
> > > > > >>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212
> > (Interpreted
> > > > > >>>> frame)
> > > > > >>>>>>>>> - java.lang.Runtime.exit(int) @bci=14, line=109
> > (Interpreted
> > > > > >>>> frame)
> > > > > >>>>>>>>> - java.lang.System.exit(int) @bci=4, line=962
> (Interpreted
> > > > > >>> frame)
> > > > > >>>>>>>>> -
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object,
> > > > > >>>>>>>>> scala.Function1) @bci=352, line=81 (Interpreted frame)
> > > > > >>>>>>>>> - akka.actor.ActorCell.receiveMessage(java.lang.Object)
> > > > > >> @bci=25,
> > > > > >>>>>>>>> line=498
> > > > > >>>>>>>>> (Interpreted frame)
> > > > > >>>>>>>>> - akka.actor.ActorCell.invoke(akka.dispatch.Envelope)
> > > @bci=39,
> > > > > >>>>>> line=456
> > > > > >>>>>>>>> (Interpreted frame)
> > > > > >>>>>>>>> - akka.dispatch.Mailbox.processMailbox(int, long)
> @bci=24,
> > > > > >>>> line=237
> > > > > >>>>>>>>> (Interpreted frame)
> > > > > >>>>>>>>> - akka.dispatch.Mailbox.run() @bci=20, line=219
> > (Interpreted
> > > > > >>>> frame)
> > > > > >>>>>>>>> -
> > > > > >>>>
> > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec()
> > > > > >>>>>>>>> @bci=4, line=386 (Interpreted frame)
> > > > > >>>>>>>>> - scala.concurrent.forkjoin.ForkJoinTask.doExec()
> @bci=10,
> > > > > >>>> line=260
> > > > > >>>>>>>>> (Compiled frame)
> > > > > >>>>>>>>> -
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask)
> > > > > >>>>>>>>> @bci=10, line=1339 (Compiled frame)
> > > > > >>>>>>>>> -
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue)
> > > > > >>>>>>>>> @bci=11, line=1979 (Compiled frame)
> > > > > >>>>>>>>> - scala.concurrent.forkjoin.ForkJoinWorkerThread.run()
> > > > > >> @bci=14,
> > > > > >>>>>>>>> line=107
> > > > > >>>>>>>>> (Interpreted frame)
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Thread 3865: (state = BLOCKED)
> > > > > >>>>>>>>> - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
> > > > > >>>>>>>>> - java.lang.Thread.join(long) @bci=38, line=1280
> > (Interpreted
> > > > > >>>> frame)
> > > > > >>>>>>>>> - java.lang.Thread.join() @bci=2, line=1354 (Interpreted
> > > > > >> frame)
> > > > > >>>>>>>>> - java.lang.ApplicationShutdownHooks.runHooks() @bci=87,
> > > > > >>> line=106
> > > > > >>>>>>>>> (Interpreted frame)
> > > > > >>>>>>>>> - java.lang.ApplicationShutdownHooks$1.run() @bci=0,
> > line=46
> > > > > >>>>>>>>> (Interpreted
> > > > > >>>>>>>>> frame)
> > > > > >>>>>>>>> - java.lang.Shutdown.runHooks() @bci=39, line=123
> > > (Interpreted
> > > > > >>>> frame)
> > > > > >>>>>>>>> - java.lang.Shutdown.sequence() @bci=26, line=167
> > > (Interpreted
> > > > > >>>> frame)
> > > > > >>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212
> > (Interpreted
> > > > > >>>> frame)
> > > > > >>>>>>>>> - java.lang.Terminator$1.handle(sun.misc.Signal) @bci=8,
> > > > > >> line=52
> > > > > >>>>>>>>> (Interpreted frame)
> > > > > >>>>>>>>> - sun.misc.Signal$1.run() @bci=8, line=212 (Interpreted
> > > frame)
> > > > > >>>>>>>>> - java.lang.Thread.run() @bci=11, line=744 (Interpreted
> > > frame)
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Thread 3987: (state = BLOCKED)
> > > > > >>>>>>>>> - java.io.UnixFileSystem.list(java.io.File) @bci=0
> > > > > >> (Interpreted
> > > > > >>>>>> frame)
> > > > > >>>>>>>>> - java.io.File.list() @bci=29, line=1116 (Interpreted
> > frame)
> > > > > >>>>>>>>> - java.io.File.listFiles() @bci=1, line=1201 (Compiled
> > frame)
> > > > > >>>>>>>>> -
> > org.apache.spark.util.Utils$.listFilesSafely(java.io.File)
> > > > > >>>> @bci=1,
> > > > > >>>>>>>>> line=466 (Interpreted frame)
> > > > > >>>>>>>>> -
> > > org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
> > > > > >>>>>> @bci=9,
> > > > > >>>>>>>>> line=478 (Compiled frame)
> > > > > >>>>>>>>> -
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File)
> > > > > >>>>>>>>> @bci=4, line=479 (Compiled frame)
> > > > > >>>>>>>>> -
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object)
> > > > > >>>>>>>>> @bci=5, line=478 (Compiled frame)
> > > > > >>>>>>>>> -
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> > > > > >>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
> > > > > >>>>>>>>> -
> > > > > >> scala.collection.mutable.WrappedArray.foreach(scala.Function1)
> > > > > >>>>>>>>> @bci=2,
> > > > > >>>>>>>>> line=34 (Compiled frame)
> > > > > >>>>>>>>> -
> > > org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
> > > > > >>>>>> @bci=19,
> > > > > >>>>>>>>> line=478 (Interpreted frame)
> > > > > >>>>>>>>> -
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File)
> > > > > >>>>>>>>> @bci=14, line=141 (Interpreted frame)
> > > > > >>>>>>>>> -
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object)
> > > > > >>>>>>>>> @bci=5, line=139 (Interpreted frame)
> > > > > >>>>>>>>> -
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> > > > > >>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
> > > > > >>>>>>>>> -
> > > > > >>>
> scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
> > > > > >>>>>>>>> @bci=2,
> > > > > >>>>>>>>> line=108 (Interpreted frame)
> > > > > >>>>>>>>> - org.apache.spark.storage.DiskBlockManager$$anon$1.run()
> > > > > >>> @bci=39,
> > > > > >>>>>>>>> line=139 (Interpreted frame)
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> I think what happened here is that thread 14308 received
> > the
> > > > > >> akka
> > > > > >>>>>>>>> "shutdown" message and called System.exit().  This
> started
> > > > > >> thread
> > > > > >>>>>> 3865,
> > > > > >>>>>>>>> which is the JVM shutting itself down.  Part of that
> > process
> > > is
> > > > > >>>>>> running
> > > > > >>>>>>>>> the
> > > > > >>>>>>>>> shutdown hooks, so it started thread 3987.  That thread
> is
> > > the
> > > > > >>>>>> shutdown
> > > > > >>>>>>>>> hook from addShutdownHook() in DiskBlockManager.scala,
> > which
> > > > > >>> looks
> > > > > >>>>>> like
> > > > > >>>>>>>>> this:
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>  private def addShutdownHook() {
> > > > > >>>>>>>>>    localDirs.foreach(localDir =>
> > > > > >>>>>>>>> Utils.registerShutdownDeleteDir(localDir))
> > > > > >>>>>>>>>    Runtime.getRuntime.addShutdownHook(new Thread("delete
> > > Spark
> > > > > >>>> local
> > > > > >>>>>>>>> dirs") {
> > > > > >>>>>>>>>      override def run() {
> > > > > >>>>>>>>>        logDebug("Shutdown hook called")
> > > > > >>>>>>>>>        localDirs.foreach { localDir =>
> > > > > >>>>>>>>>          try {
> > > > > >>>>>>>>>            if
> (!Utils.hasRootAsShutdownDeleteDir(localDir))
> > > > > >>>>>>>>> Utils.deleteRecursively(localDir)
> > > > > >>>>>>>>>          } catch {
> > > > > >>>>>>>>>            case t: Throwable =>
> > > > > >>>>>>>>>              logError("Exception while deleting local
> spark
> > > > > >> dir:
> > > > > >>>> " +
> > > > > >>>>>>>>> localDir, t)
> > > > > >>>>>>>>>          }
> > > > > >>>>>>>>>        }
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>        if (shuffleSender != null) {
> > > > > >>>>>>>>>          shuffleSender.stop()
> > > > > >>>>>>>>>        }
> > > > > >>>>>>>>>      }
> > > > > >>>>>>>>>    })
> > > > > >>>>>>>>>  }
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> It goes through and deletes the directories recursively.
>  I
> > > was
> > > > > >>>>>> thinking
> > > > > >>>>>>>>> there might be some issues with concurrently-running
> > shutdown
> > > > > >>> hooks
> > > > > >>>>>>>>> deleting things out from underneath each other (shutdown
> > hook
> > > > > >>>> javadocs
> > > > > >>>>>>>>> say
> > > > > >>>>>>>>> they're all started in parallel if multiple hooks are
> > added)
> > > > > >>>> causing
> > > > > >>>>>> the
> > > > > >>>>>>>>> File.list() in that last thread to take quite some time.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> While I was looking through the stacktrace the JVM
> finally
> > > > > >> exited
> > > > > >>>>>> (after
> > > > > >>>>>>>>> 15-20min at least) so I won't be able to debug more until
> > > this
> > > > > >>> bug
> > > > > >>>>>>>>> strikes
> > > > > >>>>>>>>> again.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Any ideas on what might be going on here?
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Thanks!
> > > > > >>>>>>>>> Andrew
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: [0.9.0] Possible deadlock in shutdown hook?

Posted by Andrew Ash <an...@andrewash.com>.
An additional shutdown hook to stop the threadpool is much more elegant
than the name matching and thread interrupting I was thinking about.  That
Javadoc looks like it's a best-effort shutdown and won't hard kill threads,
but that's at least a step forward from current behavior.

http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html#shutdownNow()


On Fri, Feb 7, 2014 at 12:13 AM, Tathagata Das
<ta...@gmail.com>wrote:

> Or we can try adding a shutdown hook in the
> Executor<
> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala?source=c#L127
> >to
> call threadPool.shutdownNow(). May have to catch the
> InterruptedException and handle it gracefully out
> here<
> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala?source=c#L255
> >
> .
>
> TD
>
>
> On Thu, Feb 6, 2014 at 11:49 PM, Andrew Ash <an...@andrewash.com> wrote:
>
> > I think we can enumerate all current threads with the ThreadMXBean,
> filter
> > to those threads with the name of executor pool in them, and interrupt
> > them.
> >
> >
> >
> http://docs.oracle.com/javase/6/docs/api/java/lang/management/ManagementFactory.html#getThreadMXBean%28%29
> >
> > The executor threads are currently named according to the pattern
> "Executor
> > task launch worker-X"
> >
> >
> > On Thu, Feb 6, 2014 at 11:45 PM, Tathagata Das
> > <ta...@gmail.com>wrote:
> >
> > > That definitely sound more reliable. Worth trying out if there is a
> > > reliable way of reproducing the deadlock-like scenario.
> > >
> > > TD
> > >
> > >
> > > On Thu, Feb 6, 2014 at 11:38 PM, Matei Zaharia <
> matei.zaharia@gmail.com
> > > >wrote:
> > >
> > > > I don't think we necessarily want to do this through the DAGScheduler
> > > > because the worker might also shut down due to some unusual
> termination
> > > > condition, like the driver node crashing. Can't we do it at the top
> of
> > > the
> > > > shutdown hook instead? If all the threads are in the same thread pool
> > it
> > > > might be possible to interrupt or stop the whole pool.
> > > >
> > > > Matei
> > > >
> > > > On Feb 6, 2014, at 11:30 PM, Andrew Ash <an...@andrewash.com>
> wrote:
> > > >
> > > > > That's genius.  Of course when a worker is told to shutdown it
> should
> > > > > interrupt its worker threads -- I think that would address this
> > issue.
> > > > >
> > > > > Are you thinking to put
> > > > >
> > > > > running.map(_.jobId).foreach { handleJobCancellation }
> > > > >
> > > > > at the top of the StopDAGScheduler block?
> > > > >
> > > > >
> > > > > On Thu, Feb 6, 2014 at 11:05 PM, Tathagata Das
> > > > > <ta...@gmail.com>wrote:
> > > > >
> > > > >> Its highly likely that the executor with the threadpool that runs
> > the
> > > > tasks
> > > > >> are the only set of threads that writes to disk. The tasks are
> > > designed
> > > > to
> > > > >> be interrupted when the corresponding job is cancelled. So a
> > > reasonably
> > > > >> simple way could be to actually cancel the currently active jobs,
> > > which
> > > > >> would send the signal to the worker to stop the tasks. Currently,
> > the
> > > > >> DAGScheduler<
> > > > >>
> > > >
> > >
> >
> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L610
> > > > >>> does
> > > > >> not seem to actually cancel the jobs, only mark them as failed. So
> > it
> > > > >> may be a simple addition.
> > > > >>
> > > > >> There may be some complications with the external spilling of
> > shuffle
> > > > data
> > > > >> to disk not stopping immediately when the task is marked for
> > killing.
> > > > Gotta
> > > > >> try it out.
> > > > >>
> > > > >> TD
> > > > >>
> > > > >> On Thu, Feb 6, 2014 at 10:39 PM, Andrew Ash <andrew@andrewash.com
> >
> > > > wrote:
> > > > >>
> > > > >>> There is probably just one threadpool that has task threads -- is
> > it
> > > > >>> possible to enumerate and interrupt just those?  We may need to
> > keep
> > > > >> string
> > > > >>> a reference to that threadpool through to the shutdown thread to
> > make
> > > > >> that
> > > > >>> happen.
> > > > >>>
> > > > >>>
> > > > >>> On Thu, Feb 6, 2014 at 10:36 PM, Mridul Muralidharan <
> > > mridul@gmail.com
> > > > >>>> wrote:
> > > > >>>
> > > > >>>> Ideally, interrupting the thread writing to disk should be
> > > sufficient
> > > > >>>> - though since we are in middle of shutdown when this is
> > happening,
> > > it
> > > > >>>> is best case effort anyway.
> > > > >>>> Identifying which threads to interrupt will be interesting since
> > > most
> > > > >>>> of them are driven by threadpool's and we cant list all threads
> > and
> > > > >>>> interrupt all of them !
> > > > >>>>
> > > > >>>>
> > > > >>>> Regards,
> > > > >>>> Mridul
> > > > >>>>
> > > > >>>>
> > > > >>>> On Fri, Feb 7, 2014 at 5:57 AM, Andrew Ash <
> andrew@andrewash.com>
> > > > >> wrote:
> > > > >>>>> I think the solution where we stop the writing threads and then
> > let
> > > > >> the
> > > > >>>>> deleting threads completely clean up is the best option since
> the
> > > > >> final
> > > > >>>>> state doesn't have half-deleted temp dirs scattered across the
> > > > >> cluster.
> > > > >>>>>
> > > > >>>>> How feasible do you think it'd be to interrupt the other
> threads?
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> On Thu, Feb 6, 2014 at 10:54 AM, Mridul Muralidharan <
> > > > >> mridul@gmail.com
> > > > >>>>> wrote:
> > > > >>>>>
> > > > >>>>>> Looks like a pathological corner case here - where the the
> > delete
> > > > >>>>>> thread is not getting run while the OS is busy prioritizing
> the
> > > > >> thread
> > > > >>>>>> writing data (probably with heavy gc too).
> > > > >>>>>> Ideally, the delete thread would list files, remove them and
> > then
> > > > >> fail
> > > > >>>>>> when it tries to remove the non empty directory (since other
> > > thread
> > > > >>>>>> might be creating more in parallel).
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> Regards,
> > > > >>>>>> Mridul
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> On Thu, Feb 6, 2014 at 4:19 PM, Andrew Ash <
> > andrew@andrewash.com>
> > > > >>>> wrote:
> > > > >>>>>>> Got a repro locally on my MBP (the other was on a CentOS
> > > machine).
> > > > >>>>>>>
> > > > >>>>>>> Build spark, run a master and a worker with the
> > sbin/start-all.sh
> > > > >>>> script,
> > > > >>>>>>> then run this in a shell:
> > > > >>>>>>>
> > > > >>>>>>> import org.apache.spark.storage.StorageLevel._
> > > > >>>>>>> val s = sc.parallelize(1 to
> > > > >>> 1000000000).persist(MEMORY_AND_DISK_SER);
> > > > >>>>>>> s.count
> > > > >>>>>>>
> > > > >>>>>>> After about a minute, this line appears in the shell logging
> > > > >> output:
> > > > >>>>>>>
> > > > >>>>>>> 14/02/06 02:44:44 WARN BlockManagerMasterActor: Removing
> > > > >>> BlockManager
> > > > >>>>>>> BlockManagerId(0, aash-mbp.dyn.yojoe.local, 57895, 0) with no
> > > > >> recent
> > > > >>>>>> heart
> > > > >>>>>>> beats: 57510ms exceeds 45000ms
> > > > >>>>>>>
> > > > >>>>>>> Ctrl-C the shell.  In jps there is now a worker, a master,
> and
> > a
> > > > >>>>>>> CoarseGrainedExecutorBackend.
> > > > >>>>>>>
> > > > >>>>>>> Run jstack on the CGEBackend JVM, and I got the attached
> > > > >>> stacktraces.
> > > > >>>> I
> > > > >>>>>>> waited around for 15min then kill -9'd the JVM and restarted
> > the
> > > > >>>> process.
> > > > >>>>>>>
> > > > >>>>>>> I wonder if what's happening here is that the threads that
> are
> > > > >>> spewing
> > > > >>>>>> data
> > > > >>>>>>> to disk (as that parallelize and persist would do) can write
> to
> > > > >> disk
> > > > >>>>>> faster
> > > > >>>>>>> than the cleanup threads can delete from disk.
> > > > >>>>>>>
> > > > >>>>>>> What do you think of that theory?
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> Andrew
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> On Thu, Feb 6, 2014 at 2:30 AM, Mridul Muralidharan <
> > > > >>> mridul@gmail.com
> > > > >>>>>
> > > > >>>>>>> wrote:
> > > > >>>>>>>>
> > > > >>>>>>>> shutdown hooks should not take 15 mins are you mentioned !
> > > > >>>>>>>> On the other hand, how busy was your disk when this was
> > > > >> happening ?
> > > > >>>>>>>> (either due to spark or something else ?)
> > > > >>>>>>>>
> > > > >>>>>>>> It might just be that there was a lot of stuff to remove ?
> > > > >>>>>>>>
> > > > >>>>>>>> Regards,
> > > > >>>>>>>> Mridul
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> On Thu, Feb 6, 2014 at 3:50 PM, Andrew Ash <
> > > andrew@andrewash.com
> > > > >>>
> > > > >>>>>> wrote:
> > > > >>>>>>>>> Hi Spark devs,
> > > > >>>>>>>>>
> > > > >>>>>>>>> Occasionally when hitting Ctrl-C in the scala spark shell
> on
> > > > >>> 0.9.0
> > > > >>>> one
> > > > >>>>>>>>> of
> > > > >>>>>>>>> my workers goes dead in the spark master UI.  I'm using the
> > > > >>>> standalone
> > > > >>>>>>>>> cluster and didn't ever see this while using 0.8.0 so I
> think
> > > > >> it
> > > > >>>> may
> > > > >>>>>> be
> > > > >>>>>>>>> a
> > > > >>>>>>>>> regression.
> > > > >>>>>>>>>
> > > > >>>>>>>>> When I prod on the hung CoarseGrainedExecutorBackend JVM
> with
> > > > >>>> jstack
> > > > >>>>>> and
> > > > >>>>>>>>> jmap -heap, it doesn't respond unless I add the -F force
> > flag.
> > > > >>> The
> > > > >>>>>> heap
> > > > >>>>>>>>> isn't full, but there are some interesting bits in the
> > jstack.
> > > > >>>> Poking
> > > > >>>>>>>>> around a little, I think there may be some kind of deadlock
> > in
> > > > >>> the
> > > > >>>>>>>>> shutdown
> > > > >>>>>>>>> hooks.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Below are the threads I think are most interesting:
> > > > >>>>>>>>>
> > > > >>>>>>>>> Thread 14308: (state = BLOCKED)
> > > > >>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212
> (Interpreted
> > > > >>>> frame)
> > > > >>>>>>>>> - java.lang.Runtime.exit(int) @bci=14, line=109
> (Interpreted
> > > > >>>> frame)
> > > > >>>>>>>>> - java.lang.System.exit(int) @bci=4, line=962 (Interpreted
> > > > >>> frame)
> > > > >>>>>>>>> -
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object,
> > > > >>>>>>>>> scala.Function1) @bci=352, line=81 (Interpreted frame)
> > > > >>>>>>>>> - akka.actor.ActorCell.receiveMessage(java.lang.Object)
> > > > >> @bci=25,
> > > > >>>>>>>>> line=498
> > > > >>>>>>>>> (Interpreted frame)
> > > > >>>>>>>>> - akka.actor.ActorCell.invoke(akka.dispatch.Envelope)
> > @bci=39,
> > > > >>>>>> line=456
> > > > >>>>>>>>> (Interpreted frame)
> > > > >>>>>>>>> - akka.dispatch.Mailbox.processMailbox(int, long) @bci=24,
> > > > >>>> line=237
> > > > >>>>>>>>> (Interpreted frame)
> > > > >>>>>>>>> - akka.dispatch.Mailbox.run() @bci=20, line=219
> (Interpreted
> > > > >>>> frame)
> > > > >>>>>>>>> -
> > > > >>>>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec()
> > > > >>>>>>>>> @bci=4, line=386 (Interpreted frame)
> > > > >>>>>>>>> - scala.concurrent.forkjoin.ForkJoinTask.doExec() @bci=10,
> > > > >>>> line=260
> > > > >>>>>>>>> (Compiled frame)
> > > > >>>>>>>>> -
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask)
> > > > >>>>>>>>> @bci=10, line=1339 (Compiled frame)
> > > > >>>>>>>>> -
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue)
> > > > >>>>>>>>> @bci=11, line=1979 (Compiled frame)
> > > > >>>>>>>>> - scala.concurrent.forkjoin.ForkJoinWorkerThread.run()
> > > > >> @bci=14,
> > > > >>>>>>>>> line=107
> > > > >>>>>>>>> (Interpreted frame)
> > > > >>>>>>>>>
> > > > >>>>>>>>> Thread 3865: (state = BLOCKED)
> > > > >>>>>>>>> - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
> > > > >>>>>>>>> - java.lang.Thread.join(long) @bci=38, line=1280
> (Interpreted
> > > > >>>> frame)
> > > > >>>>>>>>> - java.lang.Thread.join() @bci=2, line=1354 (Interpreted
> > > > >> frame)
> > > > >>>>>>>>> - java.lang.ApplicationShutdownHooks.runHooks() @bci=87,
> > > > >>> line=106
> > > > >>>>>>>>> (Interpreted frame)
> > > > >>>>>>>>> - java.lang.ApplicationShutdownHooks$1.run() @bci=0,
> line=46
> > > > >>>>>>>>> (Interpreted
> > > > >>>>>>>>> frame)
> > > > >>>>>>>>> - java.lang.Shutdown.runHooks() @bci=39, line=123
> > (Interpreted
> > > > >>>> frame)
> > > > >>>>>>>>> - java.lang.Shutdown.sequence() @bci=26, line=167
> > (Interpreted
> > > > >>>> frame)
> > > > >>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212
> (Interpreted
> > > > >>>> frame)
> > > > >>>>>>>>> - java.lang.Terminator$1.handle(sun.misc.Signal) @bci=8,
> > > > >> line=52
> > > > >>>>>>>>> (Interpreted frame)
> > > > >>>>>>>>> - sun.misc.Signal$1.run() @bci=8, line=212 (Interpreted
> > frame)
> > > > >>>>>>>>> - java.lang.Thread.run() @bci=11, line=744 (Interpreted
> > frame)
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>> Thread 3987: (state = BLOCKED)
> > > > >>>>>>>>> - java.io.UnixFileSystem.list(java.io.File) @bci=0
> > > > >> (Interpreted
> > > > >>>>>> frame)
> > > > >>>>>>>>> - java.io.File.list() @bci=29, line=1116 (Interpreted
> frame)
> > > > >>>>>>>>> - java.io.File.listFiles() @bci=1, line=1201 (Compiled
> frame)
> > > > >>>>>>>>> -
> org.apache.spark.util.Utils$.listFilesSafely(java.io.File)
> > > > >>>> @bci=1,
> > > > >>>>>>>>> line=466 (Interpreted frame)
> > > > >>>>>>>>> -
> > org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
> > > > >>>>>> @bci=9,
> > > > >>>>>>>>> line=478 (Compiled frame)
> > > > >>>>>>>>> -
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File)
> > > > >>>>>>>>> @bci=4, line=479 (Compiled frame)
> > > > >>>>>>>>> -
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object)
> > > > >>>>>>>>> @bci=5, line=478 (Compiled frame)
> > > > >>>>>>>>> -
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> > > > >>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
> > > > >>>>>>>>> -
> > > > >> scala.collection.mutable.WrappedArray.foreach(scala.Function1)
> > > > >>>>>>>>> @bci=2,
> > > > >>>>>>>>> line=34 (Compiled frame)
> > > > >>>>>>>>> -
> > org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
> > > > >>>>>> @bci=19,
> > > > >>>>>>>>> line=478 (Interpreted frame)
> > > > >>>>>>>>> -
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File)
> > > > >>>>>>>>> @bci=14, line=141 (Interpreted frame)
> > > > >>>>>>>>> -
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object)
> > > > >>>>>>>>> @bci=5, line=139 (Interpreted frame)
> > > > >>>>>>>>> -
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> > > > >>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
> > > > >>>>>>>>> -
> > > > >>> scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
> > > > >>>>>>>>> @bci=2,
> > > > >>>>>>>>> line=108 (Interpreted frame)
> > > > >>>>>>>>> - org.apache.spark.storage.DiskBlockManager$$anon$1.run()
> > > > >>> @bci=39,
> > > > >>>>>>>>> line=139 (Interpreted frame)
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>> I think what happened here is that thread 14308 received
> the
> > > > >> akka
> > > > >>>>>>>>> "shutdown" message and called System.exit().  This started
> > > > >> thread
> > > > >>>>>> 3865,
> > > > >>>>>>>>> which is the JVM shutting itself down.  Part of that
> process
> > is
> > > > >>>>>> running
> > > > >>>>>>>>> the
> > > > >>>>>>>>> shutdown hooks, so it started thread 3987.  That thread is
> > the
> > > > >>>>>> shutdown
> > > > >>>>>>>>> hook from addShutdownHook() in DiskBlockManager.scala,
> which
> > > > >>> looks
> > > > >>>>>> like
> > > > >>>>>>>>> this:
> > > > >>>>>>>>>
> > > > >>>>>>>>>  private def addShutdownHook() {
> > > > >>>>>>>>>    localDirs.foreach(localDir =>
> > > > >>>>>>>>> Utils.registerShutdownDeleteDir(localDir))
> > > > >>>>>>>>>    Runtime.getRuntime.addShutdownHook(new Thread("delete
> > Spark
> > > > >>>> local
> > > > >>>>>>>>> dirs") {
> > > > >>>>>>>>>      override def run() {
> > > > >>>>>>>>>        logDebug("Shutdown hook called")
> > > > >>>>>>>>>        localDirs.foreach { localDir =>
> > > > >>>>>>>>>          try {
> > > > >>>>>>>>>            if (!Utils.hasRootAsShutdownDeleteDir(localDir))
> > > > >>>>>>>>> Utils.deleteRecursively(localDir)
> > > > >>>>>>>>>          } catch {
> > > > >>>>>>>>>            case t: Throwable =>
> > > > >>>>>>>>>              logError("Exception while deleting local spark
> > > > >> dir:
> > > > >>>> " +
> > > > >>>>>>>>> localDir, t)
> > > > >>>>>>>>>          }
> > > > >>>>>>>>>        }
> > > > >>>>>>>>>
> > > > >>>>>>>>>        if (shuffleSender != null) {
> > > > >>>>>>>>>          shuffleSender.stop()
> > > > >>>>>>>>>        }
> > > > >>>>>>>>>      }
> > > > >>>>>>>>>    })
> > > > >>>>>>>>>  }
> > > > >>>>>>>>>
> > > > >>>>>>>>> It goes through and deletes the directories recursively.  I
> > was
> > > > >>>>>> thinking
> > > > >>>>>>>>> there might be some issues with concurrently-running
> shutdown
> > > > >>> hooks
> > > > >>>>>>>>> deleting things out from underneath each other (shutdown
> hook
> > > > >>>> javadocs
> > > > >>>>>>>>> say
> > > > >>>>>>>>> they're all started in parallel if multiple hooks are
> added)
> > > > >>>> causing
> > > > >>>>>> the
> > > > >>>>>>>>> File.list() in that last thread to take quite some time.
> > > > >>>>>>>>>
> > > > >>>>>>>>> While I was looking through the stacktrace the JVM finally
> > > > >> exited
> > > > >>>>>> (after
> > > > >>>>>>>>> 15-20min at least) so I won't be able to debug more until
> > this
> > > > >>> bug
> > > > >>>>>>>>> strikes
> > > > >>>>>>>>> again.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Any ideas on what might be going on here?
> > > > >>>>>>>>>
> > > > >>>>>>>>> Thanks!
> > > > >>>>>>>>> Andrew
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > >
> > > >
> > >
> >
>

Re: [0.9.0] Possible deadlock in shutdown hook?

Posted by Tathagata Das <ta...@gmail.com>.
Or we can try adding a shutdown hook in the
Executor<https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala?source=c#L127>to
call threadPool.shutdownNow(). May have to catch the
InterruptedException and handle it gracefully out
here<https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala?source=c#L255>
.

TD


On Thu, Feb 6, 2014 at 11:49 PM, Andrew Ash <an...@andrewash.com> wrote:

> I think we can enumerate all current threads with the ThreadMXBean, filter
> to those threads with the name of executor pool in them, and interrupt
> them.
>
>
> http://docs.oracle.com/javase/6/docs/api/java/lang/management/ManagementFactory.html#getThreadMXBean%28%29
>
> The executor threads are currently named according to the pattern "Executor
> task launch worker-X"
>
>
> On Thu, Feb 6, 2014 at 11:45 PM, Tathagata Das
> <ta...@gmail.com>wrote:
>
> > That definitely sound more reliable. Worth trying out if there is a
> > reliable way of reproducing the deadlock-like scenario.
> >
> > TD
> >
> >
> > On Thu, Feb 6, 2014 at 11:38 PM, Matei Zaharia <matei.zaharia@gmail.com
> > >wrote:
> >
> > > I don't think we necessarily want to do this through the DAGScheduler
> > > because the worker might also shut down due to some unusual termination
> > > condition, like the driver node crashing. Can't we do it at the top of
> > the
> > > shutdown hook instead? If all the threads are in the same thread pool
> it
> > > might be possible to interrupt or stop the whole pool.
> > >
> > > Matei
> > >
> > > On Feb 6, 2014, at 11:30 PM, Andrew Ash <an...@andrewash.com> wrote:
> > >
> > > > That's genius.  Of course when a worker is told to shutdown it should
> > > > interrupt its worker threads -- I think that would address this
> issue.
> > > >
> > > > Are you thinking to put
> > > >
> > > > running.map(_.jobId).foreach { handleJobCancellation }
> > > >
> > > > at the top of the StopDAGScheduler block?
> > > >
> > > >
> > > > On Thu, Feb 6, 2014 at 11:05 PM, Tathagata Das
> > > > <ta...@gmail.com>wrote:
> > > >
> > > >> Its highly likely that the executor with the threadpool that runs
> the
> > > tasks
> > > >> are the only set of threads that writes to disk. The tasks are
> > designed
> > > to
> > > >> be interrupted when the corresponding job is cancelled. So a
> > reasonably
> > > >> simple way could be to actually cancel the currently active jobs,
> > which
> > > >> would send the signal to the worker to stop the tasks. Currently,
> the
> > > >> DAGScheduler<
> > > >>
> > >
> >
> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L610
> > > >>> does
> > > >> not seem to actually cancel the jobs, only mark them as failed. So
> it
> > > >> may be a simple addition.
> > > >>
> > > >> There may be some complications with the external spilling of
> shuffle
> > > data
> > > >> to disk not stopping immediately when the task is marked for
> killing.
> > > Gotta
> > > >> try it out.
> > > >>
> > > >> TD
> > > >>
> > > >> On Thu, Feb 6, 2014 at 10:39 PM, Andrew Ash <an...@andrewash.com>
> > > wrote:
> > > >>
> > > >>> There is probably just one threadpool that has task threads -- is
> it
> > > >>> possible to enumerate and interrupt just those?  We may need to
> keep
> > > >> string
> > > >>> a reference to that threadpool through to the shutdown thread to
> make
> > > >> that
> > > >>> happen.
> > > >>>
> > > >>>
> > > >>> On Thu, Feb 6, 2014 at 10:36 PM, Mridul Muralidharan <
> > mridul@gmail.com
> > > >>>> wrote:
> > > >>>
> > > >>>> Ideally, interrupting the thread writing to disk should be
> > sufficient
> > > >>>> - though since we are in middle of shutdown when this is
> happening,
> > it
> > > >>>> is best case effort anyway.
> > > >>>> Identifying which threads to interrupt will be interesting since
> > most
> > > >>>> of them are driven by threadpool's and we cant list all threads
> and
> > > >>>> interrupt all of them !
> > > >>>>
> > > >>>>
> > > >>>> Regards,
> > > >>>> Mridul
> > > >>>>
> > > >>>>
> > > >>>> On Fri, Feb 7, 2014 at 5:57 AM, Andrew Ash <an...@andrewash.com>
> > > >> wrote:
> > > >>>>> I think the solution where we stop the writing threads and then
> let
> > > >> the
> > > >>>>> deleting threads completely clean up is the best option since the
> > > >> final
> > > >>>>> state doesn't have half-deleted temp dirs scattered across the
> > > >> cluster.
> > > >>>>>
> > > >>>>> How feasible do you think it'd be to interrupt the other threads?
> > > >>>>>
> > > >>>>>
> > > >>>>> On Thu, Feb 6, 2014 at 10:54 AM, Mridul Muralidharan <
> > > >> mridul@gmail.com
> > > >>>>> wrote:
> > > >>>>>
> > > >>>>>> Looks like a pathological corner case here - where the the
> delete
> > > >>>>>> thread is not getting run while the OS is busy prioritizing the
> > > >> thread
> > > >>>>>> writing data (probably with heavy gc too).
> > > >>>>>> Ideally, the delete thread would list files, remove them and
> then
> > > >> fail
> > > >>>>>> when it tries to remove the non empty directory (since other
> > thread
> > > >>>>>> might be creating more in parallel).
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> Regards,
> > > >>>>>> Mridul
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On Thu, Feb 6, 2014 at 4:19 PM, Andrew Ash <
> andrew@andrewash.com>
> > > >>>> wrote:
> > > >>>>>>> Got a repro locally on my MBP (the other was on a CentOS
> > machine).
> > > >>>>>>>
> > > >>>>>>> Build spark, run a master and a worker with the
> sbin/start-all.sh
> > > >>>> script,
> > > >>>>>>> then run this in a shell:
> > > >>>>>>>
> > > >>>>>>> import org.apache.spark.storage.StorageLevel._
> > > >>>>>>> val s = sc.parallelize(1 to
> > > >>> 1000000000).persist(MEMORY_AND_DISK_SER);
> > > >>>>>>> s.count
> > > >>>>>>>
> > > >>>>>>> After about a minute, this line appears in the shell logging
> > > >> output:
> > > >>>>>>>
> > > >>>>>>> 14/02/06 02:44:44 WARN BlockManagerMasterActor: Removing
> > > >>> BlockManager
> > > >>>>>>> BlockManagerId(0, aash-mbp.dyn.yojoe.local, 57895, 0) with no
> > > >> recent
> > > >>>>>> heart
> > > >>>>>>> beats: 57510ms exceeds 45000ms
> > > >>>>>>>
> > > >>>>>>> Ctrl-C the shell.  In jps there is now a worker, a master, and
> a
> > > >>>>>>> CoarseGrainedExecutorBackend.
> > > >>>>>>>
> > > >>>>>>> Run jstack on the CGEBackend JVM, and I got the attached
> > > >>> stacktraces.
> > > >>>> I
> > > >>>>>>> waited around for 15min then kill -9'd the JVM and restarted
> the
> > > >>>> process.
> > > >>>>>>>
> > > >>>>>>> I wonder if what's happening here is that the threads that are
> > > >>> spewing
> > > >>>>>> data
> > > >>>>>>> to disk (as that parallelize and persist would do) can write to
> > > >> disk
> > > >>>>>> faster
> > > >>>>>>> than the cleanup threads can delete from disk.
> > > >>>>>>>
> > > >>>>>>> What do you think of that theory?
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> Andrew
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> On Thu, Feb 6, 2014 at 2:30 AM, Mridul Muralidharan <
> > > >>> mridul@gmail.com
> > > >>>>>
> > > >>>>>>> wrote:
> > > >>>>>>>>
> > > >>>>>>>> shutdown hooks should not take 15 mins are you mentioned !
> > > >>>>>>>> On the other hand, how busy was your disk when this was
> > > >> happening ?
> > > >>>>>>>> (either due to spark or something else ?)
> > > >>>>>>>>
> > > >>>>>>>> It might just be that there was a lot of stuff to remove ?
> > > >>>>>>>>
> > > >>>>>>>> Regards,
> > > >>>>>>>> Mridul
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> On Thu, Feb 6, 2014 at 3:50 PM, Andrew Ash <
> > andrew@andrewash.com
> > > >>>
> > > >>>>>> wrote:
> > > >>>>>>>>> Hi Spark devs,
> > > >>>>>>>>>
> > > >>>>>>>>> Occasionally when hitting Ctrl-C in the scala spark shell on
> > > >>> 0.9.0
> > > >>>> one
> > > >>>>>>>>> of
> > > >>>>>>>>> my workers goes dead in the spark master UI.  I'm using the
> > > >>>> standalone
> > > >>>>>>>>> cluster and didn't ever see this while using 0.8.0 so I think
> > > >> it
> > > >>>> may
> > > >>>>>> be
> > > >>>>>>>>> a
> > > >>>>>>>>> regression.
> > > >>>>>>>>>
> > > >>>>>>>>> When I prod on the hung CoarseGrainedExecutorBackend JVM with
> > > >>>> jstack
> > > >>>>>> and
> > > >>>>>>>>> jmap -heap, it doesn't respond unless I add the -F force
> flag.
> > > >>> The
> > > >>>>>> heap
> > > >>>>>>>>> isn't full, but there are some interesting bits in the
> jstack.
> > > >>>> Poking
> > > >>>>>>>>> around a little, I think there may be some kind of deadlock
> in
> > > >>> the
> > > >>>>>>>>> shutdown
> > > >>>>>>>>> hooks.
> > > >>>>>>>>>
> > > >>>>>>>>> Below are the threads I think are most interesting:
> > > >>>>>>>>>
> > > >>>>>>>>> Thread 14308: (state = BLOCKED)
> > > >>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted
> > > >>>> frame)
> > > >>>>>>>>> - java.lang.Runtime.exit(int) @bci=14, line=109 (Interpreted
> > > >>>> frame)
> > > >>>>>>>>> - java.lang.System.exit(int) @bci=4, line=962 (Interpreted
> > > >>> frame)
> > > >>>>>>>>> -
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> >
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object,
> > > >>>>>>>>> scala.Function1) @bci=352, line=81 (Interpreted frame)
> > > >>>>>>>>> - akka.actor.ActorCell.receiveMessage(java.lang.Object)
> > > >> @bci=25,
> > > >>>>>>>>> line=498
> > > >>>>>>>>> (Interpreted frame)
> > > >>>>>>>>> - akka.actor.ActorCell.invoke(akka.dispatch.Envelope)
> @bci=39,
> > > >>>>>> line=456
> > > >>>>>>>>> (Interpreted frame)
> > > >>>>>>>>> - akka.dispatch.Mailbox.processMailbox(int, long) @bci=24,
> > > >>>> line=237
> > > >>>>>>>>> (Interpreted frame)
> > > >>>>>>>>> - akka.dispatch.Mailbox.run() @bci=20, line=219 (Interpreted
> > > >>>> frame)
> > > >>>>>>>>> -
> > > >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec()
> > > >>>>>>>>> @bci=4, line=386 (Interpreted frame)
> > > >>>>>>>>> - scala.concurrent.forkjoin.ForkJoinTask.doExec() @bci=10,
> > > >>>> line=260
> > > >>>>>>>>> (Compiled frame)
> > > >>>>>>>>> -
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask)
> > > >>>>>>>>> @bci=10, line=1339 (Compiled frame)
> > > >>>>>>>>> -
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue)
> > > >>>>>>>>> @bci=11, line=1979 (Compiled frame)
> > > >>>>>>>>> - scala.concurrent.forkjoin.ForkJoinWorkerThread.run()
> > > >> @bci=14,
> > > >>>>>>>>> line=107
> > > >>>>>>>>> (Interpreted frame)
> > > >>>>>>>>>
> > > >>>>>>>>> Thread 3865: (state = BLOCKED)
> > > >>>>>>>>> - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
> > > >>>>>>>>> - java.lang.Thread.join(long) @bci=38, line=1280 (Interpreted
> > > >>>> frame)
> > > >>>>>>>>> - java.lang.Thread.join() @bci=2, line=1354 (Interpreted
> > > >> frame)
> > > >>>>>>>>> - java.lang.ApplicationShutdownHooks.runHooks() @bci=87,
> > > >>> line=106
> > > >>>>>>>>> (Interpreted frame)
> > > >>>>>>>>> - java.lang.ApplicationShutdownHooks$1.run() @bci=0, line=46
> > > >>>>>>>>> (Interpreted
> > > >>>>>>>>> frame)
> > > >>>>>>>>> - java.lang.Shutdown.runHooks() @bci=39, line=123
> (Interpreted
> > > >>>> frame)
> > > >>>>>>>>> - java.lang.Shutdown.sequence() @bci=26, line=167
> (Interpreted
> > > >>>> frame)
> > > >>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted
> > > >>>> frame)
> > > >>>>>>>>> - java.lang.Terminator$1.handle(sun.misc.Signal) @bci=8,
> > > >> line=52
> > > >>>>>>>>> (Interpreted frame)
> > > >>>>>>>>> - sun.misc.Signal$1.run() @bci=8, line=212 (Interpreted
> frame)
> > > >>>>>>>>> - java.lang.Thread.run() @bci=11, line=744 (Interpreted
> frame)
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> Thread 3987: (state = BLOCKED)
> > > >>>>>>>>> - java.io.UnixFileSystem.list(java.io.File) @bci=0
> > > >> (Interpreted
> > > >>>>>> frame)
> > > >>>>>>>>> - java.io.File.list() @bci=29, line=1116 (Interpreted frame)
> > > >>>>>>>>> - java.io.File.listFiles() @bci=1, line=1201 (Compiled frame)
> > > >>>>>>>>> - org.apache.spark.util.Utils$.listFilesSafely(java.io.File)
> > > >>>> @bci=1,
> > > >>>>>>>>> line=466 (Interpreted frame)
> > > >>>>>>>>> -
> org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
> > > >>>>>> @bci=9,
> > > >>>>>>>>> line=478 (Compiled frame)
> > > >>>>>>>>> -
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> >
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File)
> > > >>>>>>>>> @bci=4, line=479 (Compiled frame)
> > > >>>>>>>>> -
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> >
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object)
> > > >>>>>>>>> @bci=5, line=478 (Compiled frame)
> > > >>>>>>>>> -
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> > > >>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
> > > >>>>>>>>> -
> > > >> scala.collection.mutable.WrappedArray.foreach(scala.Function1)
> > > >>>>>>>>> @bci=2,
> > > >>>>>>>>> line=34 (Compiled frame)
> > > >>>>>>>>> -
> org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
> > > >>>>>> @bci=19,
> > > >>>>>>>>> line=478 (Interpreted frame)
> > > >>>>>>>>> -
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> >
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File)
> > > >>>>>>>>> @bci=14, line=141 (Interpreted frame)
> > > >>>>>>>>> -
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> >
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object)
> > > >>>>>>>>> @bci=5, line=139 (Interpreted frame)
> > > >>>>>>>>> -
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> > > >>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
> > > >>>>>>>>> -
> > > >>> scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
> > > >>>>>>>>> @bci=2,
> > > >>>>>>>>> line=108 (Interpreted frame)
> > > >>>>>>>>> - org.apache.spark.storage.DiskBlockManager$$anon$1.run()
> > > >>> @bci=39,
> > > >>>>>>>>> line=139 (Interpreted frame)
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> I think what happened here is that thread 14308 received the
> > > >> akka
> > > >>>>>>>>> "shutdown" message and called System.exit().  This started
> > > >> thread
> > > >>>>>> 3865,
> > > >>>>>>>>> which is the JVM shutting itself down.  Part of that process
> is
> > > >>>>>> running
> > > >>>>>>>>> the
> > > >>>>>>>>> shutdown hooks, so it started thread 3987.  That thread is
> the
> > > >>>>>> shutdown
> > > >>>>>>>>> hook from addShutdownHook() in DiskBlockManager.scala, which
> > > >>> looks
> > > >>>>>> like
> > > >>>>>>>>> this:
> > > >>>>>>>>>
> > > >>>>>>>>>  private def addShutdownHook() {
> > > >>>>>>>>>    localDirs.foreach(localDir =>
> > > >>>>>>>>> Utils.registerShutdownDeleteDir(localDir))
> > > >>>>>>>>>    Runtime.getRuntime.addShutdownHook(new Thread("delete
> Spark
> > > >>>> local
> > > >>>>>>>>> dirs") {
> > > >>>>>>>>>      override def run() {
> > > >>>>>>>>>        logDebug("Shutdown hook called")
> > > >>>>>>>>>        localDirs.foreach { localDir =>
> > > >>>>>>>>>          try {
> > > >>>>>>>>>            if (!Utils.hasRootAsShutdownDeleteDir(localDir))
> > > >>>>>>>>> Utils.deleteRecursively(localDir)
> > > >>>>>>>>>          } catch {
> > > >>>>>>>>>            case t: Throwable =>
> > > >>>>>>>>>              logError("Exception while deleting local spark
> > > >> dir:
> > > >>>> " +
> > > >>>>>>>>> localDir, t)
> > > >>>>>>>>>          }
> > > >>>>>>>>>        }
> > > >>>>>>>>>
> > > >>>>>>>>>        if (shuffleSender != null) {
> > > >>>>>>>>>          shuffleSender.stop()
> > > >>>>>>>>>        }
> > > >>>>>>>>>      }
> > > >>>>>>>>>    })
> > > >>>>>>>>>  }
> > > >>>>>>>>>
> > > >>>>>>>>> It goes through and deletes the directories recursively.  I
> was
> > > >>>>>> thinking
> > > >>>>>>>>> there might be some issues with concurrently-running shutdown
> > > >>> hooks
> > > >>>>>>>>> deleting things out from underneath each other (shutdown hook
> > > >>>> javadocs
> > > >>>>>>>>> say
> > > >>>>>>>>> they're all started in parallel if multiple hooks are added)
> > > >>>> causing
> > > >>>>>> the
> > > >>>>>>>>> File.list() in that last thread to take quite some time.
> > > >>>>>>>>>
> > > >>>>>>>>> While I was looking through the stacktrace the JVM finally
> > > >> exited
> > > >>>>>> (after
> > > >>>>>>>>> 15-20min at least) so I won't be able to debug more until
> this
> > > >>> bug
> > > >>>>>>>>> strikes
> > > >>>>>>>>> again.
> > > >>>>>>>>>
> > > >>>>>>>>> Any ideas on what might be going on here?
> > > >>>>>>>>>
> > > >>>>>>>>> Thanks!
> > > >>>>>>>>> Andrew
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> > >
> >
>

Re: [0.9.0] Possible deadlock in shutdown hook?

Posted by Andrew Ash <an...@andrewash.com>.
I think we can enumerate all current threads with the ThreadMXBean, filter
to those threads with the name of executor pool in them, and interrupt them.

http://docs.oracle.com/javase/6/docs/api/java/lang/management/ManagementFactory.html#getThreadMXBean%28%29

The executor threads are currently named according to the pattern "Executor
task launch worker-X"


On Thu, Feb 6, 2014 at 11:45 PM, Tathagata Das
<ta...@gmail.com>wrote:

> That definitely sound more reliable. Worth trying out if there is a
> reliable way of reproducing the deadlock-like scenario.
>
> TD
>
>
> On Thu, Feb 6, 2014 at 11:38 PM, Matei Zaharia <matei.zaharia@gmail.com
> >wrote:
>
> > I don't think we necessarily want to do this through the DAGScheduler
> > because the worker might also shut down due to some unusual termination
> > condition, like the driver node crashing. Can't we do it at the top of
> the
> > shutdown hook instead? If all the threads are in the same thread pool it
> > might be possible to interrupt or stop the whole pool.
> >
> > Matei
> >
> > On Feb 6, 2014, at 11:30 PM, Andrew Ash <an...@andrewash.com> wrote:
> >
> > > That's genius.  Of course when a worker is told to shutdown it should
> > > interrupt its worker threads -- I think that would address this issue.
> > >
> > > Are you thinking to put
> > >
> > > running.map(_.jobId).foreach { handleJobCancellation }
> > >
> > > at the top of the StopDAGScheduler block?
> > >
> > >
> > > On Thu, Feb 6, 2014 at 11:05 PM, Tathagata Das
> > > <ta...@gmail.com>wrote:
> > >
> > >> Its highly likely that the executor with the threadpool that runs the
> > tasks
> > >> are the only set of threads that writes to disk. The tasks are
> designed
> > to
> > >> be interrupted when the corresponding job is cancelled. So a
> reasonably
> > >> simple way could be to actually cancel the currently active jobs,
> which
> > >> would send the signal to the worker to stop the tasks. Currently, the
> > >> DAGScheduler<
> > >>
> >
> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L610
> > >>> does
> > >> not seem to actually cancel the jobs, only mark them as failed. So it
> > >> may be a simple addition.
> > >>
> > >> There may be some complications with the external spilling of shuffle
> > data
> > >> to disk not stopping immediately when the task is marked for killing.
> > Gotta
> > >> try it out.
> > >>
> > >> TD
> > >>
> > >> On Thu, Feb 6, 2014 at 10:39 PM, Andrew Ash <an...@andrewash.com>
> > wrote:
> > >>
> > >>> There is probably just one threadpool that has task threads -- is it
> > >>> possible to enumerate and interrupt just those?  We may need to keep
> > >> string
> > >>> a reference to that threadpool through to the shutdown thread to make
> > >> that
> > >>> happen.
> > >>>
> > >>>
> > >>> On Thu, Feb 6, 2014 at 10:36 PM, Mridul Muralidharan <
> mridul@gmail.com
> > >>>> wrote:
> > >>>
> > >>>> Ideally, interrupting the thread writing to disk should be
> sufficient
> > >>>> - though since we are in middle of shutdown when this is happening,
> it
> > >>>> is best case effort anyway.
> > >>>> Identifying which threads to interrupt will be interesting since
> most
> > >>>> of them are driven by threadpool's and we cant list all threads and
> > >>>> interrupt all of them !
> > >>>>
> > >>>>
> > >>>> Regards,
> > >>>> Mridul
> > >>>>
> > >>>>
> > >>>> On Fri, Feb 7, 2014 at 5:57 AM, Andrew Ash <an...@andrewash.com>
> > >> wrote:
> > >>>>> I think the solution where we stop the writing threads and then let
> > >> the
> > >>>>> deleting threads completely clean up is the best option since the
> > >> final
> > >>>>> state doesn't have half-deleted temp dirs scattered across the
> > >> cluster.
> > >>>>>
> > >>>>> How feasible do you think it'd be to interrupt the other threads?
> > >>>>>
> > >>>>>
> > >>>>> On Thu, Feb 6, 2014 at 10:54 AM, Mridul Muralidharan <
> > >> mridul@gmail.com
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Looks like a pathological corner case here - where the the delete
> > >>>>>> thread is not getting run while the OS is busy prioritizing the
> > >> thread
> > >>>>>> writing data (probably with heavy gc too).
> > >>>>>> Ideally, the delete thread would list files, remove them and then
> > >> fail
> > >>>>>> when it tries to remove the non empty directory (since other
> thread
> > >>>>>> might be creating more in parallel).
> > >>>>>>
> > >>>>>>
> > >>>>>> Regards,
> > >>>>>> Mridul
> > >>>>>>
> > >>>>>>
> > >>>>>> On Thu, Feb 6, 2014 at 4:19 PM, Andrew Ash <an...@andrewash.com>
> > >>>> wrote:
> > >>>>>>> Got a repro locally on my MBP (the other was on a CentOS
> machine).
> > >>>>>>>
> > >>>>>>> Build spark, run a master and a worker with the sbin/start-all.sh
> > >>>> script,
> > >>>>>>> then run this in a shell:
> > >>>>>>>
> > >>>>>>> import org.apache.spark.storage.StorageLevel._
> > >>>>>>> val s = sc.parallelize(1 to
> > >>> 1000000000).persist(MEMORY_AND_DISK_SER);
> > >>>>>>> s.count
> > >>>>>>>
> > >>>>>>> After about a minute, this line appears in the shell logging
> > >> output:
> > >>>>>>>
> > >>>>>>> 14/02/06 02:44:44 WARN BlockManagerMasterActor: Removing
> > >>> BlockManager
> > >>>>>>> BlockManagerId(0, aash-mbp.dyn.yojoe.local, 57895, 0) with no
> > >> recent
> > >>>>>> heart
> > >>>>>>> beats: 57510ms exceeds 45000ms
> > >>>>>>>
> > >>>>>>> Ctrl-C the shell.  In jps there is now a worker, a master, and a
> > >>>>>>> CoarseGrainedExecutorBackend.
> > >>>>>>>
> > >>>>>>> Run jstack on the CGEBackend JVM, and I got the attached
> > >>> stacktraces.
> > >>>> I
> > >>>>>>> waited around for 15min then kill -9'd the JVM and restarted the
> > >>>> process.
> > >>>>>>>
> > >>>>>>> I wonder if what's happening here is that the threads that are
> > >>> spewing
> > >>>>>> data
> > >>>>>>> to disk (as that parallelize and persist would do) can write to
> > >> disk
> > >>>>>> faster
> > >>>>>>> than the cleanup threads can delete from disk.
> > >>>>>>>
> > >>>>>>> What do you think of that theory?
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Andrew
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Thu, Feb 6, 2014 at 2:30 AM, Mridul Muralidharan <
> > >>> mridul@gmail.com
> > >>>>>
> > >>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>> shutdown hooks should not take 15 mins are you mentioned !
> > >>>>>>>> On the other hand, how busy was your disk when this was
> > >> happening ?
> > >>>>>>>> (either due to spark or something else ?)
> > >>>>>>>>
> > >>>>>>>> It might just be that there was a lot of stuff to remove ?
> > >>>>>>>>
> > >>>>>>>> Regards,
> > >>>>>>>> Mridul
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On Thu, Feb 6, 2014 at 3:50 PM, Andrew Ash <
> andrew@andrewash.com
> > >>>
> > >>>>>> wrote:
> > >>>>>>>>> Hi Spark devs,
> > >>>>>>>>>
> > >>>>>>>>> Occasionally when hitting Ctrl-C in the scala spark shell on
> > >>> 0.9.0
> > >>>> one
> > >>>>>>>>> of
> > >>>>>>>>> my workers goes dead in the spark master UI.  I'm using the
> > >>>> standalone
> > >>>>>>>>> cluster and didn't ever see this while using 0.8.0 so I think
> > >> it
> > >>>> may
> > >>>>>> be
> > >>>>>>>>> a
> > >>>>>>>>> regression.
> > >>>>>>>>>
> > >>>>>>>>> When I prod on the hung CoarseGrainedExecutorBackend JVM with
> > >>>> jstack
> > >>>>>> and
> > >>>>>>>>> jmap -heap, it doesn't respond unless I add the -F force flag.
> > >>> The
> > >>>>>> heap
> > >>>>>>>>> isn't full, but there are some interesting bits in the jstack.
> > >>>> Poking
> > >>>>>>>>> around a little, I think there may be some kind of deadlock in
> > >>> the
> > >>>>>>>>> shutdown
> > >>>>>>>>> hooks.
> > >>>>>>>>>
> > >>>>>>>>> Below are the threads I think are most interesting:
> > >>>>>>>>>
> > >>>>>>>>> Thread 14308: (state = BLOCKED)
> > >>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted
> > >>>> frame)
> > >>>>>>>>> - java.lang.Runtime.exit(int) @bci=14, line=109 (Interpreted
> > >>>> frame)
> > >>>>>>>>> - java.lang.System.exit(int) @bci=4, line=962 (Interpreted
> > >>> frame)
> > >>>>>>>>> -
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>
> > >>>>
> > >>>
> > >>
> >
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object,
> > >>>>>>>>> scala.Function1) @bci=352, line=81 (Interpreted frame)
> > >>>>>>>>> - akka.actor.ActorCell.receiveMessage(java.lang.Object)
> > >> @bci=25,
> > >>>>>>>>> line=498
> > >>>>>>>>> (Interpreted frame)
> > >>>>>>>>> - akka.actor.ActorCell.invoke(akka.dispatch.Envelope) @bci=39,
> > >>>>>> line=456
> > >>>>>>>>> (Interpreted frame)
> > >>>>>>>>> - akka.dispatch.Mailbox.processMailbox(int, long) @bci=24,
> > >>>> line=237
> > >>>>>>>>> (Interpreted frame)
> > >>>>>>>>> - akka.dispatch.Mailbox.run() @bci=20, line=219 (Interpreted
> > >>>> frame)
> > >>>>>>>>> -
> > >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec()
> > >>>>>>>>> @bci=4, line=386 (Interpreted frame)
> > >>>>>>>>> - scala.concurrent.forkjoin.ForkJoinTask.doExec() @bci=10,
> > >>>> line=260
> > >>>>>>>>> (Compiled frame)
> > >>>>>>>>> -
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>
> > >>>>
> > >>>
> > >>
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask)
> > >>>>>>>>> @bci=10, line=1339 (Compiled frame)
> > >>>>>>>>> -
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>
> > >>>>
> > >>>
> > >>
> >
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue)
> > >>>>>>>>> @bci=11, line=1979 (Compiled frame)
> > >>>>>>>>> - scala.concurrent.forkjoin.ForkJoinWorkerThread.run()
> > >> @bci=14,
> > >>>>>>>>> line=107
> > >>>>>>>>> (Interpreted frame)
> > >>>>>>>>>
> > >>>>>>>>> Thread 3865: (state = BLOCKED)
> > >>>>>>>>> - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
> > >>>>>>>>> - java.lang.Thread.join(long) @bci=38, line=1280 (Interpreted
> > >>>> frame)
> > >>>>>>>>> - java.lang.Thread.join() @bci=2, line=1354 (Interpreted
> > >> frame)
> > >>>>>>>>> - java.lang.ApplicationShutdownHooks.runHooks() @bci=87,
> > >>> line=106
> > >>>>>>>>> (Interpreted frame)
> > >>>>>>>>> - java.lang.ApplicationShutdownHooks$1.run() @bci=0, line=46
> > >>>>>>>>> (Interpreted
> > >>>>>>>>> frame)
> > >>>>>>>>> - java.lang.Shutdown.runHooks() @bci=39, line=123 (Interpreted
> > >>>> frame)
> > >>>>>>>>> - java.lang.Shutdown.sequence() @bci=26, line=167 (Interpreted
> > >>>> frame)
> > >>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted
> > >>>> frame)
> > >>>>>>>>> - java.lang.Terminator$1.handle(sun.misc.Signal) @bci=8,
> > >> line=52
> > >>>>>>>>> (Interpreted frame)
> > >>>>>>>>> - sun.misc.Signal$1.run() @bci=8, line=212 (Interpreted frame)
> > >>>>>>>>> - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> Thread 3987: (state = BLOCKED)
> > >>>>>>>>> - java.io.UnixFileSystem.list(java.io.File) @bci=0
> > >> (Interpreted
> > >>>>>> frame)
> > >>>>>>>>> - java.io.File.list() @bci=29, line=1116 (Interpreted frame)
> > >>>>>>>>> - java.io.File.listFiles() @bci=1, line=1201 (Compiled frame)
> > >>>>>>>>> - org.apache.spark.util.Utils$.listFilesSafely(java.io.File)
> > >>>> @bci=1,
> > >>>>>>>>> line=466 (Interpreted frame)
> > >>>>>>>>> - org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
> > >>>>>> @bci=9,
> > >>>>>>>>> line=478 (Compiled frame)
> > >>>>>>>>> -
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>
> > >>>>
> > >>>
> > >>
> >
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File)
> > >>>>>>>>> @bci=4, line=479 (Compiled frame)
> > >>>>>>>>> -
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>
> > >>>>
> > >>>
> > >>
> >
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object)
> > >>>>>>>>> @bci=5, line=478 (Compiled frame)
> > >>>>>>>>> -
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>
> > >>>>
> > >>>
> > >>
> >
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> > >>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
> > >>>>>>>>> -
> > >> scala.collection.mutable.WrappedArray.foreach(scala.Function1)
> > >>>>>>>>> @bci=2,
> > >>>>>>>>> line=34 (Compiled frame)
> > >>>>>>>>> - org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
> > >>>>>> @bci=19,
> > >>>>>>>>> line=478 (Interpreted frame)
> > >>>>>>>>> -
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>
> > >>>>
> > >>>
> > >>
> >
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File)
> > >>>>>>>>> @bci=14, line=141 (Interpreted frame)
> > >>>>>>>>> -
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>
> > >>>>
> > >>>
> > >>
> >
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object)
> > >>>>>>>>> @bci=5, line=139 (Interpreted frame)
> > >>>>>>>>> -
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>
> > >>>>
> > >>>
> > >>
> >
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> > >>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
> > >>>>>>>>> -
> > >>> scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
> > >>>>>>>>> @bci=2,
> > >>>>>>>>> line=108 (Interpreted frame)
> > >>>>>>>>> - org.apache.spark.storage.DiskBlockManager$$anon$1.run()
> > >>> @bci=39,
> > >>>>>>>>> line=139 (Interpreted frame)
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> I think what happened here is that thread 14308 received the
> > >> akka
> > >>>>>>>>> "shutdown" message and called System.exit().  This started
> > >> thread
> > >>>>>> 3865,
> > >>>>>>>>> which is the JVM shutting itself down.  Part of that process is
> > >>>>>> running
> > >>>>>>>>> the
> > >>>>>>>>> shutdown hooks, so it started thread 3987.  That thread is the
> > >>>>>> shutdown
> > >>>>>>>>> hook from addShutdownHook() in DiskBlockManager.scala, which
> > >>> looks
> > >>>>>> like
> > >>>>>>>>> this:
> > >>>>>>>>>
> > >>>>>>>>>  private def addShutdownHook() {
> > >>>>>>>>>    localDirs.foreach(localDir =>
> > >>>>>>>>> Utils.registerShutdownDeleteDir(localDir))
> > >>>>>>>>>    Runtime.getRuntime.addShutdownHook(new Thread("delete Spark
> > >>>> local
> > >>>>>>>>> dirs") {
> > >>>>>>>>>      override def run() {
> > >>>>>>>>>        logDebug("Shutdown hook called")
> > >>>>>>>>>        localDirs.foreach { localDir =>
> > >>>>>>>>>          try {
> > >>>>>>>>>            if (!Utils.hasRootAsShutdownDeleteDir(localDir))
> > >>>>>>>>> Utils.deleteRecursively(localDir)
> > >>>>>>>>>          } catch {
> > >>>>>>>>>            case t: Throwable =>
> > >>>>>>>>>              logError("Exception while deleting local spark
> > >> dir:
> > >>>> " +
> > >>>>>>>>> localDir, t)
> > >>>>>>>>>          }
> > >>>>>>>>>        }
> > >>>>>>>>>
> > >>>>>>>>>        if (shuffleSender != null) {
> > >>>>>>>>>          shuffleSender.stop()
> > >>>>>>>>>        }
> > >>>>>>>>>      }
> > >>>>>>>>>    })
> > >>>>>>>>>  }
> > >>>>>>>>>
> > >>>>>>>>> It goes through and deletes the directories recursively.  I was
> > >>>>>> thinking
> > >>>>>>>>> there might be some issues with concurrently-running shutdown
> > >>> hooks
> > >>>>>>>>> deleting things out from underneath each other (shutdown hook
> > >>>> javadocs
> > >>>>>>>>> say
> > >>>>>>>>> they're all started in parallel if multiple hooks are added)
> > >>>> causing
> > >>>>>> the
> > >>>>>>>>> File.list() in that last thread to take quite some time.
> > >>>>>>>>>
> > >>>>>>>>> While I was looking through the stacktrace the JVM finally
> > >> exited
> > >>>>>> (after
> > >>>>>>>>> 15-20min at least) so I won't be able to debug more until this
> > >>> bug
> > >>>>>>>>> strikes
> > >>>>>>>>> again.
> > >>>>>>>>>
> > >>>>>>>>> Any ideas on what might be going on here?
> > >>>>>>>>>
> > >>>>>>>>> Thanks!
> > >>>>>>>>> Andrew
> > >>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>
> > >>>
> > >>
> >
> >
>

Re: [0.9.0] Possible deadlock in shutdown hook?

Posted by Tathagata Das <ta...@gmail.com>.
That definitely sound more reliable. Worth trying out if there is a
reliable way of reproducing the deadlock-like scenario.

TD


On Thu, Feb 6, 2014 at 11:38 PM, Matei Zaharia <ma...@gmail.com>wrote:

> I don't think we necessarily want to do this through the DAGScheduler
> because the worker might also shut down due to some unusual termination
> condition, like the driver node crashing. Can't we do it at the top of the
> shutdown hook instead? If all the threads are in the same thread pool it
> might be possible to interrupt or stop the whole pool.
>
> Matei
>
> On Feb 6, 2014, at 11:30 PM, Andrew Ash <an...@andrewash.com> wrote:
>
> > That's genius.  Of course when a worker is told to shutdown it should
> > interrupt its worker threads -- I think that would address this issue.
> >
> > Are you thinking to put
> >
> > running.map(_.jobId).foreach { handleJobCancellation }
> >
> > at the top of the StopDAGScheduler block?
> >
> >
> > On Thu, Feb 6, 2014 at 11:05 PM, Tathagata Das
> > <ta...@gmail.com>wrote:
> >
> >> Its highly likely that the executor with the threadpool that runs the
> tasks
> >> are the only set of threads that writes to disk. The tasks are designed
> to
> >> be interrupted when the corresponding job is cancelled. So a reasonably
> >> simple way could be to actually cancel the currently active jobs, which
> >> would send the signal to the worker to stop the tasks. Currently, the
> >> DAGScheduler<
> >>
> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L610
> >>> does
> >> not seem to actually cancel the jobs, only mark them as failed. So it
> >> may be a simple addition.
> >>
> >> There may be some complications with the external spilling of shuffle
> data
> >> to disk not stopping immediately when the task is marked for killing.
> Gotta
> >> try it out.
> >>
> >> TD
> >>
> >> On Thu, Feb 6, 2014 at 10:39 PM, Andrew Ash <an...@andrewash.com>
> wrote:
> >>
> >>> There is probably just one threadpool that has task threads -- is it
> >>> possible to enumerate and interrupt just those?  We may need to keep
> >> string
> >>> a reference to that threadpool through to the shutdown thread to make
> >> that
> >>> happen.
> >>>
> >>>
> >>> On Thu, Feb 6, 2014 at 10:36 PM, Mridul Muralidharan <mridul@gmail.com
> >>>> wrote:
> >>>
> >>>> Ideally, interrupting the thread writing to disk should be sufficient
> >>>> - though since we are in middle of shutdown when this is happening, it
> >>>> is best case effort anyway.
> >>>> Identifying which threads to interrupt will be interesting since most
> >>>> of them are driven by threadpool's and we cant list all threads and
> >>>> interrupt all of them !
> >>>>
> >>>>
> >>>> Regards,
> >>>> Mridul
> >>>>
> >>>>
> >>>> On Fri, Feb 7, 2014 at 5:57 AM, Andrew Ash <an...@andrewash.com>
> >> wrote:
> >>>>> I think the solution where we stop the writing threads and then let
> >> the
> >>>>> deleting threads completely clean up is the best option since the
> >> final
> >>>>> state doesn't have half-deleted temp dirs scattered across the
> >> cluster.
> >>>>>
> >>>>> How feasible do you think it'd be to interrupt the other threads?
> >>>>>
> >>>>>
> >>>>> On Thu, Feb 6, 2014 at 10:54 AM, Mridul Muralidharan <
> >> mridul@gmail.com
> >>>>> wrote:
> >>>>>
> >>>>>> Looks like a pathological corner case here - where the the delete
> >>>>>> thread is not getting run while the OS is busy prioritizing the
> >> thread
> >>>>>> writing data (probably with heavy gc too).
> >>>>>> Ideally, the delete thread would list files, remove them and then
> >> fail
> >>>>>> when it tries to remove the non empty directory (since other thread
> >>>>>> might be creating more in parallel).
> >>>>>>
> >>>>>>
> >>>>>> Regards,
> >>>>>> Mridul
> >>>>>>
> >>>>>>
> >>>>>> On Thu, Feb 6, 2014 at 4:19 PM, Andrew Ash <an...@andrewash.com>
> >>>> wrote:
> >>>>>>> Got a repro locally on my MBP (the other was on a CentOS machine).
> >>>>>>>
> >>>>>>> Build spark, run a master and a worker with the sbin/start-all.sh
> >>>> script,
> >>>>>>> then run this in a shell:
> >>>>>>>
> >>>>>>> import org.apache.spark.storage.StorageLevel._
> >>>>>>> val s = sc.parallelize(1 to
> >>> 1000000000).persist(MEMORY_AND_DISK_SER);
> >>>>>>> s.count
> >>>>>>>
> >>>>>>> After about a minute, this line appears in the shell logging
> >> output:
> >>>>>>>
> >>>>>>> 14/02/06 02:44:44 WARN BlockManagerMasterActor: Removing
> >>> BlockManager
> >>>>>>> BlockManagerId(0, aash-mbp.dyn.yojoe.local, 57895, 0) with no
> >> recent
> >>>>>> heart
> >>>>>>> beats: 57510ms exceeds 45000ms
> >>>>>>>
> >>>>>>> Ctrl-C the shell.  In jps there is now a worker, a master, and a
> >>>>>>> CoarseGrainedExecutorBackend.
> >>>>>>>
> >>>>>>> Run jstack on the CGEBackend JVM, and I got the attached
> >>> stacktraces.
> >>>> I
> >>>>>>> waited around for 15min then kill -9'd the JVM and restarted the
> >>>> process.
> >>>>>>>
> >>>>>>> I wonder if what's happening here is that the threads that are
> >>> spewing
> >>>>>> data
> >>>>>>> to disk (as that parallelize and persist would do) can write to
> >> disk
> >>>>>> faster
> >>>>>>> than the cleanup threads can delete from disk.
> >>>>>>>
> >>>>>>> What do you think of that theory?
> >>>>>>>
> >>>>>>>
> >>>>>>> Andrew
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Thu, Feb 6, 2014 at 2:30 AM, Mridul Muralidharan <
> >>> mridul@gmail.com
> >>>>>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>> shutdown hooks should not take 15 mins are you mentioned !
> >>>>>>>> On the other hand, how busy was your disk when this was
> >> happening ?
> >>>>>>>> (either due to spark or something else ?)
> >>>>>>>>
> >>>>>>>> It might just be that there was a lot of stuff to remove ?
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Mridul
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Thu, Feb 6, 2014 at 3:50 PM, Andrew Ash <andrew@andrewash.com
> >>>
> >>>>>> wrote:
> >>>>>>>>> Hi Spark devs,
> >>>>>>>>>
> >>>>>>>>> Occasionally when hitting Ctrl-C in the scala spark shell on
> >>> 0.9.0
> >>>> one
> >>>>>>>>> of
> >>>>>>>>> my workers goes dead in the spark master UI.  I'm using the
> >>>> standalone
> >>>>>>>>> cluster and didn't ever see this while using 0.8.0 so I think
> >> it
> >>>> may
> >>>>>> be
> >>>>>>>>> a
> >>>>>>>>> regression.
> >>>>>>>>>
> >>>>>>>>> When I prod on the hung CoarseGrainedExecutorBackend JVM with
> >>>> jstack
> >>>>>> and
> >>>>>>>>> jmap -heap, it doesn't respond unless I add the -F force flag.
> >>> The
> >>>>>> heap
> >>>>>>>>> isn't full, but there are some interesting bits in the jstack.
> >>>> Poking
> >>>>>>>>> around a little, I think there may be some kind of deadlock in
> >>> the
> >>>>>>>>> shutdown
> >>>>>>>>> hooks.
> >>>>>>>>>
> >>>>>>>>> Below are the threads I think are most interesting:
> >>>>>>>>>
> >>>>>>>>> Thread 14308: (state = BLOCKED)
> >>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted
> >>>> frame)
> >>>>>>>>> - java.lang.Runtime.exit(int) @bci=14, line=109 (Interpreted
> >>>> frame)
> >>>>>>>>> - java.lang.System.exit(int) @bci=4, line=962 (Interpreted
> >>> frame)
> >>>>>>>>> -
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object,
> >>>>>>>>> scala.Function1) @bci=352, line=81 (Interpreted frame)
> >>>>>>>>> - akka.actor.ActorCell.receiveMessage(java.lang.Object)
> >> @bci=25,
> >>>>>>>>> line=498
> >>>>>>>>> (Interpreted frame)
> >>>>>>>>> - akka.actor.ActorCell.invoke(akka.dispatch.Envelope) @bci=39,
> >>>>>> line=456
> >>>>>>>>> (Interpreted frame)
> >>>>>>>>> - akka.dispatch.Mailbox.processMailbox(int, long) @bci=24,
> >>>> line=237
> >>>>>>>>> (Interpreted frame)
> >>>>>>>>> - akka.dispatch.Mailbox.run() @bci=20, line=219 (Interpreted
> >>>> frame)
> >>>>>>>>> -
> >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec()
> >>>>>>>>> @bci=4, line=386 (Interpreted frame)
> >>>>>>>>> - scala.concurrent.forkjoin.ForkJoinTask.doExec() @bci=10,
> >>>> line=260
> >>>>>>>>> (Compiled frame)
> >>>>>>>>> -
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask)
> >>>>>>>>> @bci=10, line=1339 (Compiled frame)
> >>>>>>>>> -
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue)
> >>>>>>>>> @bci=11, line=1979 (Compiled frame)
> >>>>>>>>> - scala.concurrent.forkjoin.ForkJoinWorkerThread.run()
> >> @bci=14,
> >>>>>>>>> line=107
> >>>>>>>>> (Interpreted frame)
> >>>>>>>>>
> >>>>>>>>> Thread 3865: (state = BLOCKED)
> >>>>>>>>> - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
> >>>>>>>>> - java.lang.Thread.join(long) @bci=38, line=1280 (Interpreted
> >>>> frame)
> >>>>>>>>> - java.lang.Thread.join() @bci=2, line=1354 (Interpreted
> >> frame)
> >>>>>>>>> - java.lang.ApplicationShutdownHooks.runHooks() @bci=87,
> >>> line=106
> >>>>>>>>> (Interpreted frame)
> >>>>>>>>> - java.lang.ApplicationShutdownHooks$1.run() @bci=0, line=46
> >>>>>>>>> (Interpreted
> >>>>>>>>> frame)
> >>>>>>>>> - java.lang.Shutdown.runHooks() @bci=39, line=123 (Interpreted
> >>>> frame)
> >>>>>>>>> - java.lang.Shutdown.sequence() @bci=26, line=167 (Interpreted
> >>>> frame)
> >>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted
> >>>> frame)
> >>>>>>>>> - java.lang.Terminator$1.handle(sun.misc.Signal) @bci=8,
> >> line=52
> >>>>>>>>> (Interpreted frame)
> >>>>>>>>> - sun.misc.Signal$1.run() @bci=8, line=212 (Interpreted frame)
> >>>>>>>>> - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Thread 3987: (state = BLOCKED)
> >>>>>>>>> - java.io.UnixFileSystem.list(java.io.File) @bci=0
> >> (Interpreted
> >>>>>> frame)
> >>>>>>>>> - java.io.File.list() @bci=29, line=1116 (Interpreted frame)
> >>>>>>>>> - java.io.File.listFiles() @bci=1, line=1201 (Compiled frame)
> >>>>>>>>> - org.apache.spark.util.Utils$.listFilesSafely(java.io.File)
> >>>> @bci=1,
> >>>>>>>>> line=466 (Interpreted frame)
> >>>>>>>>> - org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
> >>>>>> @bci=9,
> >>>>>>>>> line=478 (Compiled frame)
> >>>>>>>>> -
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File)
> >>>>>>>>> @bci=4, line=479 (Compiled frame)
> >>>>>>>>> -
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object)
> >>>>>>>>> @bci=5, line=478 (Compiled frame)
> >>>>>>>>> -
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> >>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
> >>>>>>>>> -
> >> scala.collection.mutable.WrappedArray.foreach(scala.Function1)
> >>>>>>>>> @bci=2,
> >>>>>>>>> line=34 (Compiled frame)
> >>>>>>>>> - org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
> >>>>>> @bci=19,
> >>>>>>>>> line=478 (Interpreted frame)
> >>>>>>>>> -
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File)
> >>>>>>>>> @bci=14, line=141 (Interpreted frame)
> >>>>>>>>> -
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object)
> >>>>>>>>> @bci=5, line=139 (Interpreted frame)
> >>>>>>>>> -
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> >>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
> >>>>>>>>> -
> >>> scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
> >>>>>>>>> @bci=2,
> >>>>>>>>> line=108 (Interpreted frame)
> >>>>>>>>> - org.apache.spark.storage.DiskBlockManager$$anon$1.run()
> >>> @bci=39,
> >>>>>>>>> line=139 (Interpreted frame)
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> I think what happened here is that thread 14308 received the
> >> akka
> >>>>>>>>> "shutdown" message and called System.exit().  This started
> >> thread
> >>>>>> 3865,
> >>>>>>>>> which is the JVM shutting itself down.  Part of that process is
> >>>>>> running
> >>>>>>>>> the
> >>>>>>>>> shutdown hooks, so it started thread 3987.  That thread is the
> >>>>>> shutdown
> >>>>>>>>> hook from addShutdownHook() in DiskBlockManager.scala, which
> >>> looks
> >>>>>> like
> >>>>>>>>> this:
> >>>>>>>>>
> >>>>>>>>>  private def addShutdownHook() {
> >>>>>>>>>    localDirs.foreach(localDir =>
> >>>>>>>>> Utils.registerShutdownDeleteDir(localDir))
> >>>>>>>>>    Runtime.getRuntime.addShutdownHook(new Thread("delete Spark
> >>>> local
> >>>>>>>>> dirs") {
> >>>>>>>>>      override def run() {
> >>>>>>>>>        logDebug("Shutdown hook called")
> >>>>>>>>>        localDirs.foreach { localDir =>
> >>>>>>>>>          try {
> >>>>>>>>>            if (!Utils.hasRootAsShutdownDeleteDir(localDir))
> >>>>>>>>> Utils.deleteRecursively(localDir)
> >>>>>>>>>          } catch {
> >>>>>>>>>            case t: Throwable =>
> >>>>>>>>>              logError("Exception while deleting local spark
> >> dir:
> >>>> " +
> >>>>>>>>> localDir, t)
> >>>>>>>>>          }
> >>>>>>>>>        }
> >>>>>>>>>
> >>>>>>>>>        if (shuffleSender != null) {
> >>>>>>>>>          shuffleSender.stop()
> >>>>>>>>>        }
> >>>>>>>>>      }
> >>>>>>>>>    })
> >>>>>>>>>  }
> >>>>>>>>>
> >>>>>>>>> It goes through and deletes the directories recursively.  I was
> >>>>>> thinking
> >>>>>>>>> there might be some issues with concurrently-running shutdown
> >>> hooks
> >>>>>>>>> deleting things out from underneath each other (shutdown hook
> >>>> javadocs
> >>>>>>>>> say
> >>>>>>>>> they're all started in parallel if multiple hooks are added)
> >>>> causing
> >>>>>> the
> >>>>>>>>> File.list() in that last thread to take quite some time.
> >>>>>>>>>
> >>>>>>>>> While I was looking through the stacktrace the JVM finally
> >> exited
> >>>>>> (after
> >>>>>>>>> 15-20min at least) so I won't be able to debug more until this
> >>> bug
> >>>>>>>>> strikes
> >>>>>>>>> again.
> >>>>>>>>>
> >>>>>>>>> Any ideas on what might be going on here?
> >>>>>>>>>
> >>>>>>>>> Thanks!
> >>>>>>>>> Andrew
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
>
>

Re: [0.9.0] Possible deadlock in shutdown hook?

Posted by Matei Zaharia <ma...@gmail.com>.
I don’t think we necessarily want to do this through the DAGScheduler because the worker might also shut down due to some unusual termination condition, like the driver node crashing. Can’t we do it at the top of the shutdown hook instead? If all the threads are in the same thread pool it might be possible to interrupt or stop the whole pool.

Matei

On Feb 6, 2014, at 11:30 PM, Andrew Ash <an...@andrewash.com> wrote:

> That's genius.  Of course when a worker is told to shutdown it should
> interrupt its worker threads -- I think that would address this issue.
> 
> Are you thinking to put
> 
> running.map(_.jobId).foreach { handleJobCancellation }
> 
> at the top of the StopDAGScheduler block?
> 
> 
> On Thu, Feb 6, 2014 at 11:05 PM, Tathagata Das
> <ta...@gmail.com>wrote:
> 
>> Its highly likely that the executor with the threadpool that runs the tasks
>> are the only set of threads that writes to disk. The tasks are designed to
>> be interrupted when the corresponding job is cancelled. So a reasonably
>> simple way could be to actually cancel the currently active jobs, which
>> would send the signal to the worker to stop the tasks. Currently, the
>> DAGScheduler<
>> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L610
>>> does
>> not seem to actually cancel the jobs, only mark them as failed. So it
>> may be a simple addition.
>> 
>> There may be some complications with the external spilling of shuffle data
>> to disk not stopping immediately when the task is marked for killing. Gotta
>> try it out.
>> 
>> TD
>> 
>> On Thu, Feb 6, 2014 at 10:39 PM, Andrew Ash <an...@andrewash.com> wrote:
>> 
>>> There is probably just one threadpool that has task threads -- is it
>>> possible to enumerate and interrupt just those?  We may need to keep
>> string
>>> a reference to that threadpool through to the shutdown thread to make
>> that
>>> happen.
>>> 
>>> 
>>> On Thu, Feb 6, 2014 at 10:36 PM, Mridul Muralidharan <mridul@gmail.com
>>>> wrote:
>>> 
>>>> Ideally, interrupting the thread writing to disk should be sufficient
>>>> - though since we are in middle of shutdown when this is happening, it
>>>> is best case effort anyway.
>>>> Identifying which threads to interrupt will be interesting since most
>>>> of them are driven by threadpool's and we cant list all threads and
>>>> interrupt all of them !
>>>> 
>>>> 
>>>> Regards,
>>>> Mridul
>>>> 
>>>> 
>>>> On Fri, Feb 7, 2014 at 5:57 AM, Andrew Ash <an...@andrewash.com>
>> wrote:
>>>>> I think the solution where we stop the writing threads and then let
>> the
>>>>> deleting threads completely clean up is the best option since the
>> final
>>>>> state doesn't have half-deleted temp dirs scattered across the
>> cluster.
>>>>> 
>>>>> How feasible do you think it'd be to interrupt the other threads?
>>>>> 
>>>>> 
>>>>> On Thu, Feb 6, 2014 at 10:54 AM, Mridul Muralidharan <
>> mridul@gmail.com
>>>>> wrote:
>>>>> 
>>>>>> Looks like a pathological corner case here - where the the delete
>>>>>> thread is not getting run while the OS is busy prioritizing the
>> thread
>>>>>> writing data (probably with heavy gc too).
>>>>>> Ideally, the delete thread would list files, remove them and then
>> fail
>>>>>> when it tries to remove the non empty directory (since other thread
>>>>>> might be creating more in parallel).
>>>>>> 
>>>>>> 
>>>>>> Regards,
>>>>>> Mridul
>>>>>> 
>>>>>> 
>>>>>> On Thu, Feb 6, 2014 at 4:19 PM, Andrew Ash <an...@andrewash.com>
>>>> wrote:
>>>>>>> Got a repro locally on my MBP (the other was on a CentOS machine).
>>>>>>> 
>>>>>>> Build spark, run a master and a worker with the sbin/start-all.sh
>>>> script,
>>>>>>> then run this in a shell:
>>>>>>> 
>>>>>>> import org.apache.spark.storage.StorageLevel._
>>>>>>> val s = sc.parallelize(1 to
>>> 1000000000).persist(MEMORY_AND_DISK_SER);
>>>>>>> s.count
>>>>>>> 
>>>>>>> After about a minute, this line appears in the shell logging
>> output:
>>>>>>> 
>>>>>>> 14/02/06 02:44:44 WARN BlockManagerMasterActor: Removing
>>> BlockManager
>>>>>>> BlockManagerId(0, aash-mbp.dyn.yojoe.local, 57895, 0) with no
>> recent
>>>>>> heart
>>>>>>> beats: 57510ms exceeds 45000ms
>>>>>>> 
>>>>>>> Ctrl-C the shell.  In jps there is now a worker, a master, and a
>>>>>>> CoarseGrainedExecutorBackend.
>>>>>>> 
>>>>>>> Run jstack on the CGEBackend JVM, and I got the attached
>>> stacktraces.
>>>> I
>>>>>>> waited around for 15min then kill -9'd the JVM and restarted the
>>>> process.
>>>>>>> 
>>>>>>> I wonder if what's happening here is that the threads that are
>>> spewing
>>>>>> data
>>>>>>> to disk (as that parallelize and persist would do) can write to
>> disk
>>>>>> faster
>>>>>>> than the cleanup threads can delete from disk.
>>>>>>> 
>>>>>>> What do you think of that theory?
>>>>>>> 
>>>>>>> 
>>>>>>> Andrew
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On Thu, Feb 6, 2014 at 2:30 AM, Mridul Muralidharan <
>>> mridul@gmail.com
>>>>> 
>>>>>>> wrote:
>>>>>>>> 
>>>>>>>> shutdown hooks should not take 15 mins are you mentioned !
>>>>>>>> On the other hand, how busy was your disk when this was
>> happening ?
>>>>>>>> (either due to spark or something else ?)
>>>>>>>> 
>>>>>>>> It might just be that there was a lot of stuff to remove ?
>>>>>>>> 
>>>>>>>> Regards,
>>>>>>>> Mridul
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Thu, Feb 6, 2014 at 3:50 PM, Andrew Ash <andrew@andrewash.com
>>> 
>>>>>> wrote:
>>>>>>>>> Hi Spark devs,
>>>>>>>>> 
>>>>>>>>> Occasionally when hitting Ctrl-C in the scala spark shell on
>>> 0.9.0
>>>> one
>>>>>>>>> of
>>>>>>>>> my workers goes dead in the spark master UI.  I'm using the
>>>> standalone
>>>>>>>>> cluster and didn't ever see this while using 0.8.0 so I think
>> it
>>>> may
>>>>>> be
>>>>>>>>> a
>>>>>>>>> regression.
>>>>>>>>> 
>>>>>>>>> When I prod on the hung CoarseGrainedExecutorBackend JVM with
>>>> jstack
>>>>>> and
>>>>>>>>> jmap -heap, it doesn't respond unless I add the -F force flag.
>>> The
>>>>>> heap
>>>>>>>>> isn't full, but there are some interesting bits in the jstack.
>>>> Poking
>>>>>>>>> around a little, I think there may be some kind of deadlock in
>>> the
>>>>>>>>> shutdown
>>>>>>>>> hooks.
>>>>>>>>> 
>>>>>>>>> Below are the threads I think are most interesting:
>>>>>>>>> 
>>>>>>>>> Thread 14308: (state = BLOCKED)
>>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted
>>>> frame)
>>>>>>>>> - java.lang.Runtime.exit(int) @bci=14, line=109 (Interpreted
>>>> frame)
>>>>>>>>> - java.lang.System.exit(int) @bci=4, line=962 (Interpreted
>>> frame)
>>>>>>>>> -
>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object,
>>>>>>>>> scala.Function1) @bci=352, line=81 (Interpreted frame)
>>>>>>>>> - akka.actor.ActorCell.receiveMessage(java.lang.Object)
>> @bci=25,
>>>>>>>>> line=498
>>>>>>>>> (Interpreted frame)
>>>>>>>>> - akka.actor.ActorCell.invoke(akka.dispatch.Envelope) @bci=39,
>>>>>> line=456
>>>>>>>>> (Interpreted frame)
>>>>>>>>> - akka.dispatch.Mailbox.processMailbox(int, long) @bci=24,
>>>> line=237
>>>>>>>>> (Interpreted frame)
>>>>>>>>> - akka.dispatch.Mailbox.run() @bci=20, line=219 (Interpreted
>>>> frame)
>>>>>>>>> -
>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec()
>>>>>>>>> @bci=4, line=386 (Interpreted frame)
>>>>>>>>> - scala.concurrent.forkjoin.ForkJoinTask.doExec() @bci=10,
>>>> line=260
>>>>>>>>> (Compiled frame)
>>>>>>>>> -
>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask)
>>>>>>>>> @bci=10, line=1339 (Compiled frame)
>>>>>>>>> -
>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue)
>>>>>>>>> @bci=11, line=1979 (Compiled frame)
>>>>>>>>> - scala.concurrent.forkjoin.ForkJoinWorkerThread.run()
>> @bci=14,
>>>>>>>>> line=107
>>>>>>>>> (Interpreted frame)
>>>>>>>>> 
>>>>>>>>> Thread 3865: (state = BLOCKED)
>>>>>>>>> - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
>>>>>>>>> - java.lang.Thread.join(long) @bci=38, line=1280 (Interpreted
>>>> frame)
>>>>>>>>> - java.lang.Thread.join() @bci=2, line=1354 (Interpreted
>> frame)
>>>>>>>>> - java.lang.ApplicationShutdownHooks.runHooks() @bci=87,
>>> line=106
>>>>>>>>> (Interpreted frame)
>>>>>>>>> - java.lang.ApplicationShutdownHooks$1.run() @bci=0, line=46
>>>>>>>>> (Interpreted
>>>>>>>>> frame)
>>>>>>>>> - java.lang.Shutdown.runHooks() @bci=39, line=123 (Interpreted
>>>> frame)
>>>>>>>>> - java.lang.Shutdown.sequence() @bci=26, line=167 (Interpreted
>>>> frame)
>>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted
>>>> frame)
>>>>>>>>> - java.lang.Terminator$1.handle(sun.misc.Signal) @bci=8,
>> line=52
>>>>>>>>> (Interpreted frame)
>>>>>>>>> - sun.misc.Signal$1.run() @bci=8, line=212 (Interpreted frame)
>>>>>>>>> - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Thread 3987: (state = BLOCKED)
>>>>>>>>> - java.io.UnixFileSystem.list(java.io.File) @bci=0
>> (Interpreted
>>>>>> frame)
>>>>>>>>> - java.io.File.list() @bci=29, line=1116 (Interpreted frame)
>>>>>>>>> - java.io.File.listFiles() @bci=1, line=1201 (Compiled frame)
>>>>>>>>> - org.apache.spark.util.Utils$.listFilesSafely(java.io.File)
>>>> @bci=1,
>>>>>>>>> line=466 (Interpreted frame)
>>>>>>>>> - org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
>>>>>> @bci=9,
>>>>>>>>> line=478 (Compiled frame)
>>>>>>>>> -
>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File)
>>>>>>>>> @bci=4, line=479 (Compiled frame)
>>>>>>>>> -
>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object)
>>>>>>>>> @bci=5, line=478 (Compiled frame)
>>>>>>>>> -
>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
>>>>>>>>> -
>> scala.collection.mutable.WrappedArray.foreach(scala.Function1)
>>>>>>>>> @bci=2,
>>>>>>>>> line=34 (Compiled frame)
>>>>>>>>> - org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
>>>>>> @bci=19,
>>>>>>>>> line=478 (Interpreted frame)
>>>>>>>>> -
>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File)
>>>>>>>>> @bci=14, line=141 (Interpreted frame)
>>>>>>>>> -
>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object)
>>>>>>>>> @bci=5, line=139 (Interpreted frame)
>>>>>>>>> -
>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
>>>>>>>>> -
>>> scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
>>>>>>>>> @bci=2,
>>>>>>>>> line=108 (Interpreted frame)
>>>>>>>>> - org.apache.spark.storage.DiskBlockManager$$anon$1.run()
>>> @bci=39,
>>>>>>>>> line=139 (Interpreted frame)
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> I think what happened here is that thread 14308 received the
>> akka
>>>>>>>>> "shutdown" message and called System.exit().  This started
>> thread
>>>>>> 3865,
>>>>>>>>> which is the JVM shutting itself down.  Part of that process is
>>>>>> running
>>>>>>>>> the
>>>>>>>>> shutdown hooks, so it started thread 3987.  That thread is the
>>>>>> shutdown
>>>>>>>>> hook from addShutdownHook() in DiskBlockManager.scala, which
>>> looks
>>>>>> like
>>>>>>>>> this:
>>>>>>>>> 
>>>>>>>>>  private def addShutdownHook() {
>>>>>>>>>    localDirs.foreach(localDir =>
>>>>>>>>> Utils.registerShutdownDeleteDir(localDir))
>>>>>>>>>    Runtime.getRuntime.addShutdownHook(new Thread("delete Spark
>>>> local
>>>>>>>>> dirs") {
>>>>>>>>>      override def run() {
>>>>>>>>>        logDebug("Shutdown hook called")
>>>>>>>>>        localDirs.foreach { localDir =>
>>>>>>>>>          try {
>>>>>>>>>            if (!Utils.hasRootAsShutdownDeleteDir(localDir))
>>>>>>>>> Utils.deleteRecursively(localDir)
>>>>>>>>>          } catch {
>>>>>>>>>            case t: Throwable =>
>>>>>>>>>              logError("Exception while deleting local spark
>> dir:
>>>> " +
>>>>>>>>> localDir, t)
>>>>>>>>>          }
>>>>>>>>>        }
>>>>>>>>> 
>>>>>>>>>        if (shuffleSender != null) {
>>>>>>>>>          shuffleSender.stop()
>>>>>>>>>        }
>>>>>>>>>      }
>>>>>>>>>    })
>>>>>>>>>  }
>>>>>>>>> 
>>>>>>>>> It goes through and deletes the directories recursively.  I was
>>>>>> thinking
>>>>>>>>> there might be some issues with concurrently-running shutdown
>>> hooks
>>>>>>>>> deleting things out from underneath each other (shutdown hook
>>>> javadocs
>>>>>>>>> say
>>>>>>>>> they're all started in parallel if multiple hooks are added)
>>>> causing
>>>>>> the
>>>>>>>>> File.list() in that last thread to take quite some time.
>>>>>>>>> 
>>>>>>>>> While I was looking through the stacktrace the JVM finally
>> exited
>>>>>> (after
>>>>>>>>> 15-20min at least) so I won't be able to debug more until this
>>> bug
>>>>>>>>> strikes
>>>>>>>>> again.
>>>>>>>>> 
>>>>>>>>> Any ideas on what might be going on here?
>>>>>>>>> 
>>>>>>>>> Thanks!
>>>>>>>>> Andrew
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> 


Re: [0.9.0] Possible deadlock in shutdown hook?

Posted by Andrew Ash <an...@andrewash.com>.
That's genius.  Of course when a worker is told to shutdown it should
interrupt its worker threads -- I think that would address this issue.

Are you thinking to put

running.map(_.jobId).foreach { handleJobCancellation }

at the top of the StopDAGScheduler block?


On Thu, Feb 6, 2014 at 11:05 PM, Tathagata Das
<ta...@gmail.com>wrote:

> Its highly likely that the executor with the threadpool that runs the tasks
> are the only set of threads that writes to disk. The tasks are designed to
> be interrupted when the corresponding job is cancelled. So a reasonably
> simple way could be to actually cancel the currently active jobs, which
> would send the signal to the worker to stop the tasks. Currently, the
> DAGScheduler<
> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L610
> >does
> not seem to actually cancel the jobs, only mark them as failed. So it
> may be a simple addition.
>
> There may be some complications with the external spilling of shuffle data
> to disk not stopping immediately when the task is marked for killing. Gotta
> try it out.
>
> TD
>
> On Thu, Feb 6, 2014 at 10:39 PM, Andrew Ash <an...@andrewash.com> wrote:
>
> > There is probably just one threadpool that has task threads -- is it
> > possible to enumerate and interrupt just those?  We may need to keep
> string
> > a reference to that threadpool through to the shutdown thread to make
> that
> > happen.
> >
> >
> > On Thu, Feb 6, 2014 at 10:36 PM, Mridul Muralidharan <mridul@gmail.com
> > >wrote:
> >
> > > Ideally, interrupting the thread writing to disk should be sufficient
> > > - though since we are in middle of shutdown when this is happening, it
> > > is best case effort anyway.
> > > Identifying which threads to interrupt will be interesting since most
> > > of them are driven by threadpool's and we cant list all threads and
> > > interrupt all of them !
> > >
> > >
> > > Regards,
> > > Mridul
> > >
> > >
> > > On Fri, Feb 7, 2014 at 5:57 AM, Andrew Ash <an...@andrewash.com>
> wrote:
> > > > I think the solution where we stop the writing threads and then let
> the
> > > > deleting threads completely clean up is the best option since the
> final
> > > > state doesn't have half-deleted temp dirs scattered across the
> cluster.
> > > >
> > > > How feasible do you think it'd be to interrupt the other threads?
> > > >
> > > >
> > > > On Thu, Feb 6, 2014 at 10:54 AM, Mridul Muralidharan <
> mridul@gmail.com
> > > >wrote:
> > > >
> > > >> Looks like a pathological corner case here - where the the delete
> > > >> thread is not getting run while the OS is busy prioritizing the
> thread
> > > >> writing data (probably with heavy gc too).
> > > >> Ideally, the delete thread would list files, remove them and then
> fail
> > > >> when it tries to remove the non empty directory (since other thread
> > > >> might be creating more in parallel).
> > > >>
> > > >>
> > > >> Regards,
> > > >> Mridul
> > > >>
> > > >>
> > > >> On Thu, Feb 6, 2014 at 4:19 PM, Andrew Ash <an...@andrewash.com>
> > > wrote:
> > > >> > Got a repro locally on my MBP (the other was on a CentOS machine).
> > > >> >
> > > >> > Build spark, run a master and a worker with the sbin/start-all.sh
> > > script,
> > > >> > then run this in a shell:
> > > >> >
> > > >> > import org.apache.spark.storage.StorageLevel._
> > > >> > val s = sc.parallelize(1 to
> > 1000000000).persist(MEMORY_AND_DISK_SER);
> > > >> > s.count
> > > >> >
> > > >> > After about a minute, this line appears in the shell logging
> output:
> > > >> >
> > > >> > 14/02/06 02:44:44 WARN BlockManagerMasterActor: Removing
> > BlockManager
> > > >> > BlockManagerId(0, aash-mbp.dyn.yojoe.local, 57895, 0) with no
> recent
> > > >> heart
> > > >> > beats: 57510ms exceeds 45000ms
> > > >> >
> > > >> > Ctrl-C the shell.  In jps there is now a worker, a master, and a
> > > >> > CoarseGrainedExecutorBackend.
> > > >> >
> > > >> > Run jstack on the CGEBackend JVM, and I got the attached
> > stacktraces.
> > >  I
> > > >> > waited around for 15min then kill -9'd the JVM and restarted the
> > > process.
> > > >> >
> > > >> > I wonder if what's happening here is that the threads that are
> > spewing
> > > >> data
> > > >> > to disk (as that parallelize and persist would do) can write to
> disk
> > > >> faster
> > > >> > than the cleanup threads can delete from disk.
> > > >> >
> > > >> > What do you think of that theory?
> > > >> >
> > > >> >
> > > >> > Andrew
> > > >> >
> > > >> >
> > > >> >
> > > >> > On Thu, Feb 6, 2014 at 2:30 AM, Mridul Muralidharan <
> > mridul@gmail.com
> > > >
> > > >> > wrote:
> > > >> >>
> > > >> >> shutdown hooks should not take 15 mins are you mentioned !
> > > >> >> On the other hand, how busy was your disk when this was
> happening ?
> > > >> >> (either due to spark or something else ?)
> > > >> >>
> > > >> >> It might just be that there was a lot of stuff to remove ?
> > > >> >>
> > > >> >> Regards,
> > > >> >> Mridul
> > > >> >>
> > > >> >>
> > > >> >> On Thu, Feb 6, 2014 at 3:50 PM, Andrew Ash <andrew@andrewash.com
> >
> > > >> wrote:
> > > >> >> > Hi Spark devs,
> > > >> >> >
> > > >> >> > Occasionally when hitting Ctrl-C in the scala spark shell on
> > 0.9.0
> > > one
> > > >> >> > of
> > > >> >> > my workers goes dead in the spark master UI.  I'm using the
> > > standalone
> > > >> >> > cluster and didn't ever see this while using 0.8.0 so I think
> it
> > > may
> > > >> be
> > > >> >> > a
> > > >> >> > regression.
> > > >> >> >
> > > >> >> > When I prod on the hung CoarseGrainedExecutorBackend JVM with
> > > jstack
> > > >> and
> > > >> >> > jmap -heap, it doesn't respond unless I add the -F force flag.
> >  The
> > > >> heap
> > > >> >> > isn't full, but there are some interesting bits in the jstack.
> > >  Poking
> > > >> >> > around a little, I think there may be some kind of deadlock in
> > the
> > > >> >> > shutdown
> > > >> >> > hooks.
> > > >> >> >
> > > >> >> > Below are the threads I think are most interesting:
> > > >> >> >
> > > >> >> > Thread 14308: (state = BLOCKED)
> > > >> >> >  - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted
> > > frame)
> > > >> >> >  - java.lang.Runtime.exit(int) @bci=14, line=109 (Interpreted
> > > frame)
> > > >> >> >  - java.lang.System.exit(int) @bci=4, line=962 (Interpreted
> > frame)
> > > >> >> >  -
> > > >> >> >
> > > >> >> >
> > > >>
> > >
> >
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object,
> > > >> >> > scala.Function1) @bci=352, line=81 (Interpreted frame)
> > > >> >> >  - akka.actor.ActorCell.receiveMessage(java.lang.Object)
> @bci=25,
> > > >> >> > line=498
> > > >> >> > (Interpreted frame)
> > > >> >> >  - akka.actor.ActorCell.invoke(akka.dispatch.Envelope) @bci=39,
> > > >> line=456
> > > >> >> > (Interpreted frame)
> > > >> >> >  - akka.dispatch.Mailbox.processMailbox(int, long) @bci=24,
> > > line=237
> > > >> >> > (Interpreted frame)
> > > >> >> >  - akka.dispatch.Mailbox.run() @bci=20, line=219 (Interpreted
> > > frame)
> > > >> >> >  -
> > > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec()
> > > >> >> > @bci=4, line=386 (Interpreted frame)
> > > >> >> >  - scala.concurrent.forkjoin.ForkJoinTask.doExec() @bci=10,
> > > line=260
> > > >> >> > (Compiled frame)
> > > >> >> >  -
> > > >> >> >
> > > >> >> >
> > > >>
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask)
> > > >> >> > @bci=10, line=1339 (Compiled frame)
> > > >> >> >  -
> > > >> >> >
> > > >> >> >
> > > >>
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue)
> > > >> >> > @bci=11, line=1979 (Compiled frame)
> > > >> >> >  - scala.concurrent.forkjoin.ForkJoinWorkerThread.run()
> @bci=14,
> > > >> >> > line=107
> > > >> >> > (Interpreted frame)
> > > >> >> >
> > > >> >> > Thread 3865: (state = BLOCKED)
> > > >> >> >  - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
> > > >> >> >  - java.lang.Thread.join(long) @bci=38, line=1280 (Interpreted
> > > frame)
> > > >> >> >  - java.lang.Thread.join() @bci=2, line=1354 (Interpreted
> frame)
> > > >> >> >  - java.lang.ApplicationShutdownHooks.runHooks() @bci=87,
> > line=106
> > > >> >> > (Interpreted frame)
> > > >> >> >  - java.lang.ApplicationShutdownHooks$1.run() @bci=0, line=46
> > > >> >> > (Interpreted
> > > >> >> > frame)
> > > >> >> >  - java.lang.Shutdown.runHooks() @bci=39, line=123 (Interpreted
> > > frame)
> > > >> >> >  - java.lang.Shutdown.sequence() @bci=26, line=167 (Interpreted
> > > frame)
> > > >> >> >  - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted
> > > frame)
> > > >> >> >  - java.lang.Terminator$1.handle(sun.misc.Signal) @bci=8,
> line=52
> > > >> >> > (Interpreted frame)
> > > >> >> >  - sun.misc.Signal$1.run() @bci=8, line=212 (Interpreted frame)
> > > >> >> >  - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
> > > >> >> >
> > > >> >> >
> > > >> >> > Thread 3987: (state = BLOCKED)
> > > >> >> >  - java.io.UnixFileSystem.list(java.io.File) @bci=0
> (Interpreted
> > > >> frame)
> > > >> >> >  - java.io.File.list() @bci=29, line=1116 (Interpreted frame)
> > > >> >> >  - java.io.File.listFiles() @bci=1, line=1201 (Compiled frame)
> > > >> >> >  - org.apache.spark.util.Utils$.listFilesSafely(java.io.File)
> > > @bci=1,
> > > >> >> > line=466 (Interpreted frame)
> > > >> >> >  - org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
> > > >> @bci=9,
> > > >> >> > line=478 (Compiled frame)
> > > >> >> >  -
> > > >> >> >
> > > >> >> >
> > > >>
> > >
> >
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File)
> > > >> >> > @bci=4, line=479 (Compiled frame)
> > > >> >> >  -
> > > >> >> >
> > > >> >> >
> > > >>
> > >
> >
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object)
> > > >> >> > @bci=5, line=478 (Compiled frame)
> > > >> >> >  -
> > > >> >> >
> > > >> >> >
> > > >>
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> > > >> >> > scala.Function1) @bci=22, line=33 (Compiled frame)
> > > >> >> >  -
> scala.collection.mutable.WrappedArray.foreach(scala.Function1)
> > > >> >> > @bci=2,
> > > >> >> > line=34 (Compiled frame)
> > > >> >> >  - org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
> > > >> @bci=19,
> > > >> >> > line=478 (Interpreted frame)
> > > >> >> >  -
> > > >> >> >
> > > >> >> >
> > > >>
> > >
> >
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File)
> > > >> >> > @bci=14, line=141 (Interpreted frame)
> > > >> >> >  -
> > > >> >> >
> > > >> >> >
> > > >>
> > >
> >
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object)
> > > >> >> > @bci=5, line=139 (Interpreted frame)
> > > >> >> >  -
> > > >> >> >
> > > >> >> >
> > > >>
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> > > >> >> > scala.Function1) @bci=22, line=33 (Compiled frame)
> > > >> >> >  -
> > scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
> > > >> >> > @bci=2,
> > > >> >> > line=108 (Interpreted frame)
> > > >> >> >  - org.apache.spark.storage.DiskBlockManager$$anon$1.run()
> > @bci=39,
> > > >> >> > line=139 (Interpreted frame)
> > > >> >> >
> > > >> >> >
> > > >> >> > I think what happened here is that thread 14308 received the
> akka
> > > >> >> > "shutdown" message and called System.exit().  This started
> thread
> > > >> 3865,
> > > >> >> > which is the JVM shutting itself down.  Part of that process is
> > > >> running
> > > >> >> > the
> > > >> >> > shutdown hooks, so it started thread 3987.  That thread is the
> > > >> shutdown
> > > >> >> > hook from addShutdownHook() in DiskBlockManager.scala, which
> > looks
> > > >> like
> > > >> >> > this:
> > > >> >> >
> > > >> >> >   private def addShutdownHook() {
> > > >> >> >     localDirs.foreach(localDir =>
> > > >> >> > Utils.registerShutdownDeleteDir(localDir))
> > > >> >> >     Runtime.getRuntime.addShutdownHook(new Thread("delete Spark
> > > local
> > > >> >> > dirs") {
> > > >> >> >       override def run() {
> > > >> >> >         logDebug("Shutdown hook called")
> > > >> >> >         localDirs.foreach { localDir =>
> > > >> >> >           try {
> > > >> >> >             if (!Utils.hasRootAsShutdownDeleteDir(localDir))
> > > >> >> > Utils.deleteRecursively(localDir)
> > > >> >> >           } catch {
> > > >> >> >             case t: Throwable =>
> > > >> >> >               logError("Exception while deleting local spark
> dir:
> > > " +
> > > >> >> > localDir, t)
> > > >> >> >           }
> > > >> >> >         }
> > > >> >> >
> > > >> >> >         if (shuffleSender != null) {
> > > >> >> >           shuffleSender.stop()
> > > >> >> >         }
> > > >> >> >       }
> > > >> >> >     })
> > > >> >> >   }
> > > >> >> >
> > > >> >> > It goes through and deletes the directories recursively.  I was
> > > >> thinking
> > > >> >> > there might be some issues with concurrently-running shutdown
> > hooks
> > > >> >> > deleting things out from underneath each other (shutdown hook
> > > javadocs
> > > >> >> > say
> > > >> >> > they're all started in parallel if multiple hooks are added)
> > > causing
> > > >> the
> > > >> >> > File.list() in that last thread to take quite some time.
> > > >> >> >
> > > >> >> > While I was looking through the stacktrace the JVM finally
> exited
> > > >> (after
> > > >> >> > 15-20min at least) so I won't be able to debug more until this
> > bug
> > > >> >> > strikes
> > > >> >> > again.
> > > >> >> >
> > > >> >> > Any ideas on what might be going on here?
> > > >> >> >
> > > >> >> > Thanks!
> > > >> >> > Andrew
> > > >> >
> > > >> >
> > > >>
> > >
> >
>

Re: [0.9.0] Possible deadlock in shutdown hook?

Posted by Tathagata Das <ta...@gmail.com>.
Its highly likely that the executor with the threadpool that runs the tasks
are the only set of threads that writes to disk. The tasks are designed to
be interrupted when the corresponding job is cancelled. So a reasonably
simple way could be to actually cancel the currently active jobs, which
would send the signal to the worker to stop the tasks. Currently, the
DAGScheduler<https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L610>does
not seem to actually cancel the jobs, only mark them as failed. So it
may be a simple addition.

There may be some complications with the external spilling of shuffle data
to disk not stopping immediately when the task is marked for killing. Gotta
try it out.

TD

On Thu, Feb 6, 2014 at 10:39 PM, Andrew Ash <an...@andrewash.com> wrote:

> There is probably just one threadpool that has task threads -- is it
> possible to enumerate and interrupt just those?  We may need to keep string
> a reference to that threadpool through to the shutdown thread to make that
> happen.
>
>
> On Thu, Feb 6, 2014 at 10:36 PM, Mridul Muralidharan <mridul@gmail.com
> >wrote:
>
> > Ideally, interrupting the thread writing to disk should be sufficient
> > - though since we are in middle of shutdown when this is happening, it
> > is best case effort anyway.
> > Identifying which threads to interrupt will be interesting since most
> > of them are driven by threadpool's and we cant list all threads and
> > interrupt all of them !
> >
> >
> > Regards,
> > Mridul
> >
> >
> > On Fri, Feb 7, 2014 at 5:57 AM, Andrew Ash <an...@andrewash.com> wrote:
> > > I think the solution where we stop the writing threads and then let the
> > > deleting threads completely clean up is the best option since the final
> > > state doesn't have half-deleted temp dirs scattered across the cluster.
> > >
> > > How feasible do you think it'd be to interrupt the other threads?
> > >
> > >
> > > On Thu, Feb 6, 2014 at 10:54 AM, Mridul Muralidharan <mridul@gmail.com
> > >wrote:
> > >
> > >> Looks like a pathological corner case here - where the the delete
> > >> thread is not getting run while the OS is busy prioritizing the thread
> > >> writing data (probably with heavy gc too).
> > >> Ideally, the delete thread would list files, remove them and then fail
> > >> when it tries to remove the non empty directory (since other thread
> > >> might be creating more in parallel).
> > >>
> > >>
> > >> Regards,
> > >> Mridul
> > >>
> > >>
> > >> On Thu, Feb 6, 2014 at 4:19 PM, Andrew Ash <an...@andrewash.com>
> > wrote:
> > >> > Got a repro locally on my MBP (the other was on a CentOS machine).
> > >> >
> > >> > Build spark, run a master and a worker with the sbin/start-all.sh
> > script,
> > >> > then run this in a shell:
> > >> >
> > >> > import org.apache.spark.storage.StorageLevel._
> > >> > val s = sc.parallelize(1 to
> 1000000000).persist(MEMORY_AND_DISK_SER);
> > >> > s.count
> > >> >
> > >> > After about a minute, this line appears in the shell logging output:
> > >> >
> > >> > 14/02/06 02:44:44 WARN BlockManagerMasterActor: Removing
> BlockManager
> > >> > BlockManagerId(0, aash-mbp.dyn.yojoe.local, 57895, 0) with no recent
> > >> heart
> > >> > beats: 57510ms exceeds 45000ms
> > >> >
> > >> > Ctrl-C the shell.  In jps there is now a worker, a master, and a
> > >> > CoarseGrainedExecutorBackend.
> > >> >
> > >> > Run jstack on the CGEBackend JVM, and I got the attached
> stacktraces.
> >  I
> > >> > waited around for 15min then kill -9'd the JVM and restarted the
> > process.
> > >> >
> > >> > I wonder if what's happening here is that the threads that are
> spewing
> > >> data
> > >> > to disk (as that parallelize and persist would do) can write to disk
> > >> faster
> > >> > than the cleanup threads can delete from disk.
> > >> >
> > >> > What do you think of that theory?
> > >> >
> > >> >
> > >> > Andrew
> > >> >
> > >> >
> > >> >
> > >> > On Thu, Feb 6, 2014 at 2:30 AM, Mridul Muralidharan <
> mridul@gmail.com
> > >
> > >> > wrote:
> > >> >>
> > >> >> shutdown hooks should not take 15 mins are you mentioned !
> > >> >> On the other hand, how busy was your disk when this was happening ?
> > >> >> (either due to spark or something else ?)
> > >> >>
> > >> >> It might just be that there was a lot of stuff to remove ?
> > >> >>
> > >> >> Regards,
> > >> >> Mridul
> > >> >>
> > >> >>
> > >> >> On Thu, Feb 6, 2014 at 3:50 PM, Andrew Ash <an...@andrewash.com>
> > >> wrote:
> > >> >> > Hi Spark devs,
> > >> >> >
> > >> >> > Occasionally when hitting Ctrl-C in the scala spark shell on
> 0.9.0
> > one
> > >> >> > of
> > >> >> > my workers goes dead in the spark master UI.  I'm using the
> > standalone
> > >> >> > cluster and didn't ever see this while using 0.8.0 so I think it
> > may
> > >> be
> > >> >> > a
> > >> >> > regression.
> > >> >> >
> > >> >> > When I prod on the hung CoarseGrainedExecutorBackend JVM with
> > jstack
> > >> and
> > >> >> > jmap -heap, it doesn't respond unless I add the -F force flag.
>  The
> > >> heap
> > >> >> > isn't full, but there are some interesting bits in the jstack.
> >  Poking
> > >> >> > around a little, I think there may be some kind of deadlock in
> the
> > >> >> > shutdown
> > >> >> > hooks.
> > >> >> >
> > >> >> > Below are the threads I think are most interesting:
> > >> >> >
> > >> >> > Thread 14308: (state = BLOCKED)
> > >> >> >  - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted
> > frame)
> > >> >> >  - java.lang.Runtime.exit(int) @bci=14, line=109 (Interpreted
> > frame)
> > >> >> >  - java.lang.System.exit(int) @bci=4, line=962 (Interpreted
> frame)
> > >> >> >  -
> > >> >> >
> > >> >> >
> > >>
> >
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object,
> > >> >> > scala.Function1) @bci=352, line=81 (Interpreted frame)
> > >> >> >  - akka.actor.ActorCell.receiveMessage(java.lang.Object) @bci=25,
> > >> >> > line=498
> > >> >> > (Interpreted frame)
> > >> >> >  - akka.actor.ActorCell.invoke(akka.dispatch.Envelope) @bci=39,
> > >> line=456
> > >> >> > (Interpreted frame)
> > >> >> >  - akka.dispatch.Mailbox.processMailbox(int, long) @bci=24,
> > line=237
> > >> >> > (Interpreted frame)
> > >> >> >  - akka.dispatch.Mailbox.run() @bci=20, line=219 (Interpreted
> > frame)
> > >> >> >  -
> > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec()
> > >> >> > @bci=4, line=386 (Interpreted frame)
> > >> >> >  - scala.concurrent.forkjoin.ForkJoinTask.doExec() @bci=10,
> > line=260
> > >> >> > (Compiled frame)
> > >> >> >  -
> > >> >> >
> > >> >> >
> > >>
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask)
> > >> >> > @bci=10, line=1339 (Compiled frame)
> > >> >> >  -
> > >> >> >
> > >> >> >
> > >>
> >
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue)
> > >> >> > @bci=11, line=1979 (Compiled frame)
> > >> >> >  - scala.concurrent.forkjoin.ForkJoinWorkerThread.run() @bci=14,
> > >> >> > line=107
> > >> >> > (Interpreted frame)
> > >> >> >
> > >> >> > Thread 3865: (state = BLOCKED)
> > >> >> >  - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
> > >> >> >  - java.lang.Thread.join(long) @bci=38, line=1280 (Interpreted
> > frame)
> > >> >> >  - java.lang.Thread.join() @bci=2, line=1354 (Interpreted frame)
> > >> >> >  - java.lang.ApplicationShutdownHooks.runHooks() @bci=87,
> line=106
> > >> >> > (Interpreted frame)
> > >> >> >  - java.lang.ApplicationShutdownHooks$1.run() @bci=0, line=46
> > >> >> > (Interpreted
> > >> >> > frame)
> > >> >> >  - java.lang.Shutdown.runHooks() @bci=39, line=123 (Interpreted
> > frame)
> > >> >> >  - java.lang.Shutdown.sequence() @bci=26, line=167 (Interpreted
> > frame)
> > >> >> >  - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted
> > frame)
> > >> >> >  - java.lang.Terminator$1.handle(sun.misc.Signal) @bci=8, line=52
> > >> >> > (Interpreted frame)
> > >> >> >  - sun.misc.Signal$1.run() @bci=8, line=212 (Interpreted frame)
> > >> >> >  - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
> > >> >> >
> > >> >> >
> > >> >> > Thread 3987: (state = BLOCKED)
> > >> >> >  - java.io.UnixFileSystem.list(java.io.File) @bci=0 (Interpreted
> > >> frame)
> > >> >> >  - java.io.File.list() @bci=29, line=1116 (Interpreted frame)
> > >> >> >  - java.io.File.listFiles() @bci=1, line=1201 (Compiled frame)
> > >> >> >  - org.apache.spark.util.Utils$.listFilesSafely(java.io.File)
> > @bci=1,
> > >> >> > line=466 (Interpreted frame)
> > >> >> >  - org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
> > >> @bci=9,
> > >> >> > line=478 (Compiled frame)
> > >> >> >  -
> > >> >> >
> > >> >> >
> > >>
> >
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File)
> > >> >> > @bci=4, line=479 (Compiled frame)
> > >> >> >  -
> > >> >> >
> > >> >> >
> > >>
> >
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object)
> > >> >> > @bci=5, line=478 (Compiled frame)
> > >> >> >  -
> > >> >> >
> > >> >> >
> > >>
> >
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> > >> >> > scala.Function1) @bci=22, line=33 (Compiled frame)
> > >> >> >  - scala.collection.mutable.WrappedArray.foreach(scala.Function1)
> > >> >> > @bci=2,
> > >> >> > line=34 (Compiled frame)
> > >> >> >  - org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
> > >> @bci=19,
> > >> >> > line=478 (Interpreted frame)
> > >> >> >  -
> > >> >> >
> > >> >> >
> > >>
> >
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File)
> > >> >> > @bci=14, line=141 (Interpreted frame)
> > >> >> >  -
> > >> >> >
> > >> >> >
> > >>
> >
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object)
> > >> >> > @bci=5, line=139 (Interpreted frame)
> > >> >> >  -
> > >> >> >
> > >> >> >
> > >>
> >
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> > >> >> > scala.Function1) @bci=22, line=33 (Compiled frame)
> > >> >> >  -
> scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
> > >> >> > @bci=2,
> > >> >> > line=108 (Interpreted frame)
> > >> >> >  - org.apache.spark.storage.DiskBlockManager$$anon$1.run()
> @bci=39,
> > >> >> > line=139 (Interpreted frame)
> > >> >> >
> > >> >> >
> > >> >> > I think what happened here is that thread 14308 received the akka
> > >> >> > "shutdown" message and called System.exit().  This started thread
> > >> 3865,
> > >> >> > which is the JVM shutting itself down.  Part of that process is
> > >> running
> > >> >> > the
> > >> >> > shutdown hooks, so it started thread 3987.  That thread is the
> > >> shutdown
> > >> >> > hook from addShutdownHook() in DiskBlockManager.scala, which
> looks
> > >> like
> > >> >> > this:
> > >> >> >
> > >> >> >   private def addShutdownHook() {
> > >> >> >     localDirs.foreach(localDir =>
> > >> >> > Utils.registerShutdownDeleteDir(localDir))
> > >> >> >     Runtime.getRuntime.addShutdownHook(new Thread("delete Spark
> > local
> > >> >> > dirs") {
> > >> >> >       override def run() {
> > >> >> >         logDebug("Shutdown hook called")
> > >> >> >         localDirs.foreach { localDir =>
> > >> >> >           try {
> > >> >> >             if (!Utils.hasRootAsShutdownDeleteDir(localDir))
> > >> >> > Utils.deleteRecursively(localDir)
> > >> >> >           } catch {
> > >> >> >             case t: Throwable =>
> > >> >> >               logError("Exception while deleting local spark dir:
> > " +
> > >> >> > localDir, t)
> > >> >> >           }
> > >> >> >         }
> > >> >> >
> > >> >> >         if (shuffleSender != null) {
> > >> >> >           shuffleSender.stop()
> > >> >> >         }
> > >> >> >       }
> > >> >> >     })
> > >> >> >   }
> > >> >> >
> > >> >> > It goes through and deletes the directories recursively.  I was
> > >> thinking
> > >> >> > there might be some issues with concurrently-running shutdown
> hooks
> > >> >> > deleting things out from underneath each other (shutdown hook
> > javadocs
> > >> >> > say
> > >> >> > they're all started in parallel if multiple hooks are added)
> > causing
> > >> the
> > >> >> > File.list() in that last thread to take quite some time.
> > >> >> >
> > >> >> > While I was looking through the stacktrace the JVM finally exited
> > >> (after
> > >> >> > 15-20min at least) so I won't be able to debug more until this
> bug
> > >> >> > strikes
> > >> >> > again.
> > >> >> >
> > >> >> > Any ideas on what might be going on here?
> > >> >> >
> > >> >> > Thanks!
> > >> >> > Andrew
> > >> >
> > >> >
> > >>
> >
>

Re: [0.9.0] Possible deadlock in shutdown hook?

Posted by Andrew Ash <an...@andrewash.com>.
There is probably just one threadpool that has task threads -- is it
possible to enumerate and interrupt just those?  We may need to keep string
a reference to that threadpool through to the shutdown thread to make that
happen.


On Thu, Feb 6, 2014 at 10:36 PM, Mridul Muralidharan <mr...@gmail.com>wrote:

> Ideally, interrupting the thread writing to disk should be sufficient
> - though since we are in middle of shutdown when this is happening, it
> is best case effort anyway.
> Identifying which threads to interrupt will be interesting since most
> of them are driven by threadpool's and we cant list all threads and
> interrupt all of them !
>
>
> Regards,
> Mridul
>
>
> On Fri, Feb 7, 2014 at 5:57 AM, Andrew Ash <an...@andrewash.com> wrote:
> > I think the solution where we stop the writing threads and then let the
> > deleting threads completely clean up is the best option since the final
> > state doesn't have half-deleted temp dirs scattered across the cluster.
> >
> > How feasible do you think it'd be to interrupt the other threads?
> >
> >
> > On Thu, Feb 6, 2014 at 10:54 AM, Mridul Muralidharan <mridul@gmail.com
> >wrote:
> >
> >> Looks like a pathological corner case here - where the the delete
> >> thread is not getting run while the OS is busy prioritizing the thread
> >> writing data (probably with heavy gc too).
> >> Ideally, the delete thread would list files, remove them and then fail
> >> when it tries to remove the non empty directory (since other thread
> >> might be creating more in parallel).
> >>
> >>
> >> Regards,
> >> Mridul
> >>
> >>
> >> On Thu, Feb 6, 2014 at 4:19 PM, Andrew Ash <an...@andrewash.com>
> wrote:
> >> > Got a repro locally on my MBP (the other was on a CentOS machine).
> >> >
> >> > Build spark, run a master and a worker with the sbin/start-all.sh
> script,
> >> > then run this in a shell:
> >> >
> >> > import org.apache.spark.storage.StorageLevel._
> >> > val s = sc.parallelize(1 to 1000000000).persist(MEMORY_AND_DISK_SER);
> >> > s.count
> >> >
> >> > After about a minute, this line appears in the shell logging output:
> >> >
> >> > 14/02/06 02:44:44 WARN BlockManagerMasterActor: Removing BlockManager
> >> > BlockManagerId(0, aash-mbp.dyn.yojoe.local, 57895, 0) with no recent
> >> heart
> >> > beats: 57510ms exceeds 45000ms
> >> >
> >> > Ctrl-C the shell.  In jps there is now a worker, a master, and a
> >> > CoarseGrainedExecutorBackend.
> >> >
> >> > Run jstack on the CGEBackend JVM, and I got the attached stacktraces.
>  I
> >> > waited around for 15min then kill -9'd the JVM and restarted the
> process.
> >> >
> >> > I wonder if what's happening here is that the threads that are spewing
> >> data
> >> > to disk (as that parallelize and persist would do) can write to disk
> >> faster
> >> > than the cleanup threads can delete from disk.
> >> >
> >> > What do you think of that theory?
> >> >
> >> >
> >> > Andrew
> >> >
> >> >
> >> >
> >> > On Thu, Feb 6, 2014 at 2:30 AM, Mridul Muralidharan <mridul@gmail.com
> >
> >> > wrote:
> >> >>
> >> >> shutdown hooks should not take 15 mins are you mentioned !
> >> >> On the other hand, how busy was your disk when this was happening ?
> >> >> (either due to spark or something else ?)
> >> >>
> >> >> It might just be that there was a lot of stuff to remove ?
> >> >>
> >> >> Regards,
> >> >> Mridul
> >> >>
> >> >>
> >> >> On Thu, Feb 6, 2014 at 3:50 PM, Andrew Ash <an...@andrewash.com>
> >> wrote:
> >> >> > Hi Spark devs,
> >> >> >
> >> >> > Occasionally when hitting Ctrl-C in the scala spark shell on 0.9.0
> one
> >> >> > of
> >> >> > my workers goes dead in the spark master UI.  I'm using the
> standalone
> >> >> > cluster and didn't ever see this while using 0.8.0 so I think it
> may
> >> be
> >> >> > a
> >> >> > regression.
> >> >> >
> >> >> > When I prod on the hung CoarseGrainedExecutorBackend JVM with
> jstack
> >> and
> >> >> > jmap -heap, it doesn't respond unless I add the -F force flag.  The
> >> heap
> >> >> > isn't full, but there are some interesting bits in the jstack.
>  Poking
> >> >> > around a little, I think there may be some kind of deadlock in the
> >> >> > shutdown
> >> >> > hooks.
> >> >> >
> >> >> > Below are the threads I think are most interesting:
> >> >> >
> >> >> > Thread 14308: (state = BLOCKED)
> >> >> >  - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted
> frame)
> >> >> >  - java.lang.Runtime.exit(int) @bci=14, line=109 (Interpreted
> frame)
> >> >> >  - java.lang.System.exit(int) @bci=4, line=962 (Interpreted frame)
> >> >> >  -
> >> >> >
> >> >> >
> >>
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object,
> >> >> > scala.Function1) @bci=352, line=81 (Interpreted frame)
> >> >> >  - akka.actor.ActorCell.receiveMessage(java.lang.Object) @bci=25,
> >> >> > line=498
> >> >> > (Interpreted frame)
> >> >> >  - akka.actor.ActorCell.invoke(akka.dispatch.Envelope) @bci=39,
> >> line=456
> >> >> > (Interpreted frame)
> >> >> >  - akka.dispatch.Mailbox.processMailbox(int, long) @bci=24,
> line=237
> >> >> > (Interpreted frame)
> >> >> >  - akka.dispatch.Mailbox.run() @bci=20, line=219 (Interpreted
> frame)
> >> >> >  -
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec()
> >> >> > @bci=4, line=386 (Interpreted frame)
> >> >> >  - scala.concurrent.forkjoin.ForkJoinTask.doExec() @bci=10,
> line=260
> >> >> > (Compiled frame)
> >> >> >  -
> >> >> >
> >> >> >
> >>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask)
> >> >> > @bci=10, line=1339 (Compiled frame)
> >> >> >  -
> >> >> >
> >> >> >
> >>
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue)
> >> >> > @bci=11, line=1979 (Compiled frame)
> >> >> >  - scala.concurrent.forkjoin.ForkJoinWorkerThread.run() @bci=14,
> >> >> > line=107
> >> >> > (Interpreted frame)
> >> >> >
> >> >> > Thread 3865: (state = BLOCKED)
> >> >> >  - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
> >> >> >  - java.lang.Thread.join(long) @bci=38, line=1280 (Interpreted
> frame)
> >> >> >  - java.lang.Thread.join() @bci=2, line=1354 (Interpreted frame)
> >> >> >  - java.lang.ApplicationShutdownHooks.runHooks() @bci=87, line=106
> >> >> > (Interpreted frame)
> >> >> >  - java.lang.ApplicationShutdownHooks$1.run() @bci=0, line=46
> >> >> > (Interpreted
> >> >> > frame)
> >> >> >  - java.lang.Shutdown.runHooks() @bci=39, line=123 (Interpreted
> frame)
> >> >> >  - java.lang.Shutdown.sequence() @bci=26, line=167 (Interpreted
> frame)
> >> >> >  - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted
> frame)
> >> >> >  - java.lang.Terminator$1.handle(sun.misc.Signal) @bci=8, line=52
> >> >> > (Interpreted frame)
> >> >> >  - sun.misc.Signal$1.run() @bci=8, line=212 (Interpreted frame)
> >> >> >  - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
> >> >> >
> >> >> >
> >> >> > Thread 3987: (state = BLOCKED)
> >> >> >  - java.io.UnixFileSystem.list(java.io.File) @bci=0 (Interpreted
> >> frame)
> >> >> >  - java.io.File.list() @bci=29, line=1116 (Interpreted frame)
> >> >> >  - java.io.File.listFiles() @bci=1, line=1201 (Compiled frame)
> >> >> >  - org.apache.spark.util.Utils$.listFilesSafely(java.io.File)
> @bci=1,
> >> >> > line=466 (Interpreted frame)
> >> >> >  - org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
> >> @bci=9,
> >> >> > line=478 (Compiled frame)
> >> >> >  -
> >> >> >
> >> >> >
> >>
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File)
> >> >> > @bci=4, line=479 (Compiled frame)
> >> >> >  -
> >> >> >
> >> >> >
> >>
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object)
> >> >> > @bci=5, line=478 (Compiled frame)
> >> >> >  -
> >> >> >
> >> >> >
> >>
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> >> >> > scala.Function1) @bci=22, line=33 (Compiled frame)
> >> >> >  - scala.collection.mutable.WrappedArray.foreach(scala.Function1)
> >> >> > @bci=2,
> >> >> > line=34 (Compiled frame)
> >> >> >  - org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
> >> @bci=19,
> >> >> > line=478 (Interpreted frame)
> >> >> >  -
> >> >> >
> >> >> >
> >>
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File)
> >> >> > @bci=14, line=141 (Interpreted frame)
> >> >> >  -
> >> >> >
> >> >> >
> >>
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object)
> >> >> > @bci=5, line=139 (Interpreted frame)
> >> >> >  -
> >> >> >
> >> >> >
> >>
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> >> >> > scala.Function1) @bci=22, line=33 (Compiled frame)
> >> >> >  - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
> >> >> > @bci=2,
> >> >> > line=108 (Interpreted frame)
> >> >> >  - org.apache.spark.storage.DiskBlockManager$$anon$1.run() @bci=39,
> >> >> > line=139 (Interpreted frame)
> >> >> >
> >> >> >
> >> >> > I think what happened here is that thread 14308 received the akka
> >> >> > "shutdown" message and called System.exit().  This started thread
> >> 3865,
> >> >> > which is the JVM shutting itself down.  Part of that process is
> >> running
> >> >> > the
> >> >> > shutdown hooks, so it started thread 3987.  That thread is the
> >> shutdown
> >> >> > hook from addShutdownHook() in DiskBlockManager.scala, which looks
> >> like
> >> >> > this:
> >> >> >
> >> >> >   private def addShutdownHook() {
> >> >> >     localDirs.foreach(localDir =>
> >> >> > Utils.registerShutdownDeleteDir(localDir))
> >> >> >     Runtime.getRuntime.addShutdownHook(new Thread("delete Spark
> local
> >> >> > dirs") {
> >> >> >       override def run() {
> >> >> >         logDebug("Shutdown hook called")
> >> >> >         localDirs.foreach { localDir =>
> >> >> >           try {
> >> >> >             if (!Utils.hasRootAsShutdownDeleteDir(localDir))
> >> >> > Utils.deleteRecursively(localDir)
> >> >> >           } catch {
> >> >> >             case t: Throwable =>
> >> >> >               logError("Exception while deleting local spark dir:
> " +
> >> >> > localDir, t)
> >> >> >           }
> >> >> >         }
> >> >> >
> >> >> >         if (shuffleSender != null) {
> >> >> >           shuffleSender.stop()
> >> >> >         }
> >> >> >       }
> >> >> >     })
> >> >> >   }
> >> >> >
> >> >> > It goes through and deletes the directories recursively.  I was
> >> thinking
> >> >> > there might be some issues with concurrently-running shutdown hooks
> >> >> > deleting things out from underneath each other (shutdown hook
> javadocs
> >> >> > say
> >> >> > they're all started in parallel if multiple hooks are added)
> causing
> >> the
> >> >> > File.list() in that last thread to take quite some time.
> >> >> >
> >> >> > While I was looking through the stacktrace the JVM finally exited
> >> (after
> >> >> > 15-20min at least) so I won't be able to debug more until this bug
> >> >> > strikes
> >> >> > again.
> >> >> >
> >> >> > Any ideas on what might be going on here?
> >> >> >
> >> >> > Thanks!
> >> >> > Andrew
> >> >
> >> >
> >>
>

Re: [0.9.0] Possible deadlock in shutdown hook?

Posted by Mridul Muralidharan <mr...@gmail.com>.
Ideally, interrupting the thread writing to disk should be sufficient
- though since we are in middle of shutdown when this is happening, it
is best case effort anyway.
Identifying which threads to interrupt will be interesting since most
of them are driven by threadpool's and we cant list all threads and
interrupt all of them !


Regards,
Mridul


On Fri, Feb 7, 2014 at 5:57 AM, Andrew Ash <an...@andrewash.com> wrote:
> I think the solution where we stop the writing threads and then let the
> deleting threads completely clean up is the best option since the final
> state doesn't have half-deleted temp dirs scattered across the cluster.
>
> How feasible do you think it'd be to interrupt the other threads?
>
>
> On Thu, Feb 6, 2014 at 10:54 AM, Mridul Muralidharan <mr...@gmail.com>wrote:
>
>> Looks like a pathological corner case here - where the the delete
>> thread is not getting run while the OS is busy prioritizing the thread
>> writing data (probably with heavy gc too).
>> Ideally, the delete thread would list files, remove them and then fail
>> when it tries to remove the non empty directory (since other thread
>> might be creating more in parallel).
>>
>>
>> Regards,
>> Mridul
>>
>>
>> On Thu, Feb 6, 2014 at 4:19 PM, Andrew Ash <an...@andrewash.com> wrote:
>> > Got a repro locally on my MBP (the other was on a CentOS machine).
>> >
>> > Build spark, run a master and a worker with the sbin/start-all.sh script,
>> > then run this in a shell:
>> >
>> > import org.apache.spark.storage.StorageLevel._
>> > val s = sc.parallelize(1 to 1000000000).persist(MEMORY_AND_DISK_SER);
>> > s.count
>> >
>> > After about a minute, this line appears in the shell logging output:
>> >
>> > 14/02/06 02:44:44 WARN BlockManagerMasterActor: Removing BlockManager
>> > BlockManagerId(0, aash-mbp.dyn.yojoe.local, 57895, 0) with no recent
>> heart
>> > beats: 57510ms exceeds 45000ms
>> >
>> > Ctrl-C the shell.  In jps there is now a worker, a master, and a
>> > CoarseGrainedExecutorBackend.
>> >
>> > Run jstack on the CGEBackend JVM, and I got the attached stacktraces.  I
>> > waited around for 15min then kill -9'd the JVM and restarted the process.
>> >
>> > I wonder if what's happening here is that the threads that are spewing
>> data
>> > to disk (as that parallelize and persist would do) can write to disk
>> faster
>> > than the cleanup threads can delete from disk.
>> >
>> > What do you think of that theory?
>> >
>> >
>> > Andrew
>> >
>> >
>> >
>> > On Thu, Feb 6, 2014 at 2:30 AM, Mridul Muralidharan <mr...@gmail.com>
>> > wrote:
>> >>
>> >> shutdown hooks should not take 15 mins are you mentioned !
>> >> On the other hand, how busy was your disk when this was happening ?
>> >> (either due to spark or something else ?)
>> >>
>> >> It might just be that there was a lot of stuff to remove ?
>> >>
>> >> Regards,
>> >> Mridul
>> >>
>> >>
>> >> On Thu, Feb 6, 2014 at 3:50 PM, Andrew Ash <an...@andrewash.com>
>> wrote:
>> >> > Hi Spark devs,
>> >> >
>> >> > Occasionally when hitting Ctrl-C in the scala spark shell on 0.9.0 one
>> >> > of
>> >> > my workers goes dead in the spark master UI.  I'm using the standalone
>> >> > cluster and didn't ever see this while using 0.8.0 so I think it may
>> be
>> >> > a
>> >> > regression.
>> >> >
>> >> > When I prod on the hung CoarseGrainedExecutorBackend JVM with jstack
>> and
>> >> > jmap -heap, it doesn't respond unless I add the -F force flag.  The
>> heap
>> >> > isn't full, but there are some interesting bits in the jstack.  Poking
>> >> > around a little, I think there may be some kind of deadlock in the
>> >> > shutdown
>> >> > hooks.
>> >> >
>> >> > Below are the threads I think are most interesting:
>> >> >
>> >> > Thread 14308: (state = BLOCKED)
>> >> >  - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted frame)
>> >> >  - java.lang.Runtime.exit(int) @bci=14, line=109 (Interpreted frame)
>> >> >  - java.lang.System.exit(int) @bci=4, line=962 (Interpreted frame)
>> >> >  -
>> >> >
>> >> >
>> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object,
>> >> > scala.Function1) @bci=352, line=81 (Interpreted frame)
>> >> >  - akka.actor.ActorCell.receiveMessage(java.lang.Object) @bci=25,
>> >> > line=498
>> >> > (Interpreted frame)
>> >> >  - akka.actor.ActorCell.invoke(akka.dispatch.Envelope) @bci=39,
>> line=456
>> >> > (Interpreted frame)
>> >> >  - akka.dispatch.Mailbox.processMailbox(int, long) @bci=24, line=237
>> >> > (Interpreted frame)
>> >> >  - akka.dispatch.Mailbox.run() @bci=20, line=219 (Interpreted frame)
>> >> >  - akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec()
>> >> > @bci=4, line=386 (Interpreted frame)
>> >> >  - scala.concurrent.forkjoin.ForkJoinTask.doExec() @bci=10, line=260
>> >> > (Compiled frame)
>> >> >  -
>> >> >
>> >> >
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask)
>> >> > @bci=10, line=1339 (Compiled frame)
>> >> >  -
>> >> >
>> >> >
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue)
>> >> > @bci=11, line=1979 (Compiled frame)
>> >> >  - scala.concurrent.forkjoin.ForkJoinWorkerThread.run() @bci=14,
>> >> > line=107
>> >> > (Interpreted frame)
>> >> >
>> >> > Thread 3865: (state = BLOCKED)
>> >> >  - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
>> >> >  - java.lang.Thread.join(long) @bci=38, line=1280 (Interpreted frame)
>> >> >  - java.lang.Thread.join() @bci=2, line=1354 (Interpreted frame)
>> >> >  - java.lang.ApplicationShutdownHooks.runHooks() @bci=87, line=106
>> >> > (Interpreted frame)
>> >> >  - java.lang.ApplicationShutdownHooks$1.run() @bci=0, line=46
>> >> > (Interpreted
>> >> > frame)
>> >> >  - java.lang.Shutdown.runHooks() @bci=39, line=123 (Interpreted frame)
>> >> >  - java.lang.Shutdown.sequence() @bci=26, line=167 (Interpreted frame)
>> >> >  - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted frame)
>> >> >  - java.lang.Terminator$1.handle(sun.misc.Signal) @bci=8, line=52
>> >> > (Interpreted frame)
>> >> >  - sun.misc.Signal$1.run() @bci=8, line=212 (Interpreted frame)
>> >> >  - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
>> >> >
>> >> >
>> >> > Thread 3987: (state = BLOCKED)
>> >> >  - java.io.UnixFileSystem.list(java.io.File) @bci=0 (Interpreted
>> frame)
>> >> >  - java.io.File.list() @bci=29, line=1116 (Interpreted frame)
>> >> >  - java.io.File.listFiles() @bci=1, line=1201 (Compiled frame)
>> >> >  - org.apache.spark.util.Utils$.listFilesSafely(java.io.File) @bci=1,
>> >> > line=466 (Interpreted frame)
>> >> >  - org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
>> @bci=9,
>> >> > line=478 (Compiled frame)
>> >> >  -
>> >> >
>> >> >
>> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File)
>> >> > @bci=4, line=479 (Compiled frame)
>> >> >  -
>> >> >
>> >> >
>> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object)
>> >> > @bci=5, line=478 (Compiled frame)
>> >> >  -
>> >> >
>> >> >
>> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>> >> > scala.Function1) @bci=22, line=33 (Compiled frame)
>> >> >  - scala.collection.mutable.WrappedArray.foreach(scala.Function1)
>> >> > @bci=2,
>> >> > line=34 (Compiled frame)
>> >> >  - org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
>> @bci=19,
>> >> > line=478 (Interpreted frame)
>> >> >  -
>> >> >
>> >> >
>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File)
>> >> > @bci=14, line=141 (Interpreted frame)
>> >> >  -
>> >> >
>> >> >
>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object)
>> >> > @bci=5, line=139 (Interpreted frame)
>> >> >  -
>> >> >
>> >> >
>> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>> >> > scala.Function1) @bci=22, line=33 (Compiled frame)
>> >> >  - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
>> >> > @bci=2,
>> >> > line=108 (Interpreted frame)
>> >> >  - org.apache.spark.storage.DiskBlockManager$$anon$1.run() @bci=39,
>> >> > line=139 (Interpreted frame)
>> >> >
>> >> >
>> >> > I think what happened here is that thread 14308 received the akka
>> >> > "shutdown" message and called System.exit().  This started thread
>> 3865,
>> >> > which is the JVM shutting itself down.  Part of that process is
>> running
>> >> > the
>> >> > shutdown hooks, so it started thread 3987.  That thread is the
>> shutdown
>> >> > hook from addShutdownHook() in DiskBlockManager.scala, which looks
>> like
>> >> > this:
>> >> >
>> >> >   private def addShutdownHook() {
>> >> >     localDirs.foreach(localDir =>
>> >> > Utils.registerShutdownDeleteDir(localDir))
>> >> >     Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local
>> >> > dirs") {
>> >> >       override def run() {
>> >> >         logDebug("Shutdown hook called")
>> >> >         localDirs.foreach { localDir =>
>> >> >           try {
>> >> >             if (!Utils.hasRootAsShutdownDeleteDir(localDir))
>> >> > Utils.deleteRecursively(localDir)
>> >> >           } catch {
>> >> >             case t: Throwable =>
>> >> >               logError("Exception while deleting local spark dir: " +
>> >> > localDir, t)
>> >> >           }
>> >> >         }
>> >> >
>> >> >         if (shuffleSender != null) {
>> >> >           shuffleSender.stop()
>> >> >         }
>> >> >       }
>> >> >     })
>> >> >   }
>> >> >
>> >> > It goes through and deletes the directories recursively.  I was
>> thinking
>> >> > there might be some issues with concurrently-running shutdown hooks
>> >> > deleting things out from underneath each other (shutdown hook javadocs
>> >> > say
>> >> > they're all started in parallel if multiple hooks are added) causing
>> the
>> >> > File.list() in that last thread to take quite some time.
>> >> >
>> >> > While I was looking through the stacktrace the JVM finally exited
>> (after
>> >> > 15-20min at least) so I won't be able to debug more until this bug
>> >> > strikes
>> >> > again.
>> >> >
>> >> > Any ideas on what might be going on here?
>> >> >
>> >> > Thanks!
>> >> > Andrew
>> >
>> >
>>

Re: [0.9.0] Possible deadlock in shutdown hook?

Posted by Andrew Ash <an...@andrewash.com>.
I think the solution where we stop the writing threads and then let the
deleting threads completely clean up is the best option since the final
state doesn't have half-deleted temp dirs scattered across the cluster.

How feasible do you think it'd be to interrupt the other threads?


On Thu, Feb 6, 2014 at 10:54 AM, Mridul Muralidharan <mr...@gmail.com>wrote:

> Looks like a pathological corner case here - where the the delete
> thread is not getting run while the OS is busy prioritizing the thread
> writing data (probably with heavy gc too).
> Ideally, the delete thread would list files, remove them and then fail
> when it tries to remove the non empty directory (since other thread
> might be creating more in parallel).
>
>
> Regards,
> Mridul
>
>
> On Thu, Feb 6, 2014 at 4:19 PM, Andrew Ash <an...@andrewash.com> wrote:
> > Got a repro locally on my MBP (the other was on a CentOS machine).
> >
> > Build spark, run a master and a worker with the sbin/start-all.sh script,
> > then run this in a shell:
> >
> > import org.apache.spark.storage.StorageLevel._
> > val s = sc.parallelize(1 to 1000000000).persist(MEMORY_AND_DISK_SER);
> > s.count
> >
> > After about a minute, this line appears in the shell logging output:
> >
> > 14/02/06 02:44:44 WARN BlockManagerMasterActor: Removing BlockManager
> > BlockManagerId(0, aash-mbp.dyn.yojoe.local, 57895, 0) with no recent
> heart
> > beats: 57510ms exceeds 45000ms
> >
> > Ctrl-C the shell.  In jps there is now a worker, a master, and a
> > CoarseGrainedExecutorBackend.
> >
> > Run jstack on the CGEBackend JVM, and I got the attached stacktraces.  I
> > waited around for 15min then kill -9'd the JVM and restarted the process.
> >
> > I wonder if what's happening here is that the threads that are spewing
> data
> > to disk (as that parallelize and persist would do) can write to disk
> faster
> > than the cleanup threads can delete from disk.
> >
> > What do you think of that theory?
> >
> >
> > Andrew
> >
> >
> >
> > On Thu, Feb 6, 2014 at 2:30 AM, Mridul Muralidharan <mr...@gmail.com>
> > wrote:
> >>
> >> shutdown hooks should not take 15 mins are you mentioned !
> >> On the other hand, how busy was your disk when this was happening ?
> >> (either due to spark or something else ?)
> >>
> >> It might just be that there was a lot of stuff to remove ?
> >>
> >> Regards,
> >> Mridul
> >>
> >>
> >> On Thu, Feb 6, 2014 at 3:50 PM, Andrew Ash <an...@andrewash.com>
> wrote:
> >> > Hi Spark devs,
> >> >
> >> > Occasionally when hitting Ctrl-C in the scala spark shell on 0.9.0 one
> >> > of
> >> > my workers goes dead in the spark master UI.  I'm using the standalone
> >> > cluster and didn't ever see this while using 0.8.0 so I think it may
> be
> >> > a
> >> > regression.
> >> >
> >> > When I prod on the hung CoarseGrainedExecutorBackend JVM with jstack
> and
> >> > jmap -heap, it doesn't respond unless I add the -F force flag.  The
> heap
> >> > isn't full, but there are some interesting bits in the jstack.  Poking
> >> > around a little, I think there may be some kind of deadlock in the
> >> > shutdown
> >> > hooks.
> >> >
> >> > Below are the threads I think are most interesting:
> >> >
> >> > Thread 14308: (state = BLOCKED)
> >> >  - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted frame)
> >> >  - java.lang.Runtime.exit(int) @bci=14, line=109 (Interpreted frame)
> >> >  - java.lang.System.exit(int) @bci=4, line=962 (Interpreted frame)
> >> >  -
> >> >
> >> >
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object,
> >> > scala.Function1) @bci=352, line=81 (Interpreted frame)
> >> >  - akka.actor.ActorCell.receiveMessage(java.lang.Object) @bci=25,
> >> > line=498
> >> > (Interpreted frame)
> >> >  - akka.actor.ActorCell.invoke(akka.dispatch.Envelope) @bci=39,
> line=456
> >> > (Interpreted frame)
> >> >  - akka.dispatch.Mailbox.processMailbox(int, long) @bci=24, line=237
> >> > (Interpreted frame)
> >> >  - akka.dispatch.Mailbox.run() @bci=20, line=219 (Interpreted frame)
> >> >  - akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec()
> >> > @bci=4, line=386 (Interpreted frame)
> >> >  - scala.concurrent.forkjoin.ForkJoinTask.doExec() @bci=10, line=260
> >> > (Compiled frame)
> >> >  -
> >> >
> >> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask)
> >> > @bci=10, line=1339 (Compiled frame)
> >> >  -
> >> >
> >> >
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue)
> >> > @bci=11, line=1979 (Compiled frame)
> >> >  - scala.concurrent.forkjoin.ForkJoinWorkerThread.run() @bci=14,
> >> > line=107
> >> > (Interpreted frame)
> >> >
> >> > Thread 3865: (state = BLOCKED)
> >> >  - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
> >> >  - java.lang.Thread.join(long) @bci=38, line=1280 (Interpreted frame)
> >> >  - java.lang.Thread.join() @bci=2, line=1354 (Interpreted frame)
> >> >  - java.lang.ApplicationShutdownHooks.runHooks() @bci=87, line=106
> >> > (Interpreted frame)
> >> >  - java.lang.ApplicationShutdownHooks$1.run() @bci=0, line=46
> >> > (Interpreted
> >> > frame)
> >> >  - java.lang.Shutdown.runHooks() @bci=39, line=123 (Interpreted frame)
> >> >  - java.lang.Shutdown.sequence() @bci=26, line=167 (Interpreted frame)
> >> >  - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted frame)
> >> >  - java.lang.Terminator$1.handle(sun.misc.Signal) @bci=8, line=52
> >> > (Interpreted frame)
> >> >  - sun.misc.Signal$1.run() @bci=8, line=212 (Interpreted frame)
> >> >  - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
> >> >
> >> >
> >> > Thread 3987: (state = BLOCKED)
> >> >  - java.io.UnixFileSystem.list(java.io.File) @bci=0 (Interpreted
> frame)
> >> >  - java.io.File.list() @bci=29, line=1116 (Interpreted frame)
> >> >  - java.io.File.listFiles() @bci=1, line=1201 (Compiled frame)
> >> >  - org.apache.spark.util.Utils$.listFilesSafely(java.io.File) @bci=1,
> >> > line=466 (Interpreted frame)
> >> >  - org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
> @bci=9,
> >> > line=478 (Compiled frame)
> >> >  -
> >> >
> >> >
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File)
> >> > @bci=4, line=479 (Compiled frame)
> >> >  -
> >> >
> >> >
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object)
> >> > @bci=5, line=478 (Compiled frame)
> >> >  -
> >> >
> >> >
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> >> > scala.Function1) @bci=22, line=33 (Compiled frame)
> >> >  - scala.collection.mutable.WrappedArray.foreach(scala.Function1)
> >> > @bci=2,
> >> > line=34 (Compiled frame)
> >> >  - org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
> @bci=19,
> >> > line=478 (Interpreted frame)
> >> >  -
> >> >
> >> >
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File)
> >> > @bci=14, line=141 (Interpreted frame)
> >> >  -
> >> >
> >> >
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object)
> >> > @bci=5, line=139 (Interpreted frame)
> >> >  -
> >> >
> >> >
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> >> > scala.Function1) @bci=22, line=33 (Compiled frame)
> >> >  - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
> >> > @bci=2,
> >> > line=108 (Interpreted frame)
> >> >  - org.apache.spark.storage.DiskBlockManager$$anon$1.run() @bci=39,
> >> > line=139 (Interpreted frame)
> >> >
> >> >
> >> > I think what happened here is that thread 14308 received the akka
> >> > "shutdown" message and called System.exit().  This started thread
> 3865,
> >> > which is the JVM shutting itself down.  Part of that process is
> running
> >> > the
> >> > shutdown hooks, so it started thread 3987.  That thread is the
> shutdown
> >> > hook from addShutdownHook() in DiskBlockManager.scala, which looks
> like
> >> > this:
> >> >
> >> >   private def addShutdownHook() {
> >> >     localDirs.foreach(localDir =>
> >> > Utils.registerShutdownDeleteDir(localDir))
> >> >     Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local
> >> > dirs") {
> >> >       override def run() {
> >> >         logDebug("Shutdown hook called")
> >> >         localDirs.foreach { localDir =>
> >> >           try {
> >> >             if (!Utils.hasRootAsShutdownDeleteDir(localDir))
> >> > Utils.deleteRecursively(localDir)
> >> >           } catch {
> >> >             case t: Throwable =>
> >> >               logError("Exception while deleting local spark dir: " +
> >> > localDir, t)
> >> >           }
> >> >         }
> >> >
> >> >         if (shuffleSender != null) {
> >> >           shuffleSender.stop()
> >> >         }
> >> >       }
> >> >     })
> >> >   }
> >> >
> >> > It goes through and deletes the directories recursively.  I was
> thinking
> >> > there might be some issues with concurrently-running shutdown hooks
> >> > deleting things out from underneath each other (shutdown hook javadocs
> >> > say
> >> > they're all started in parallel if multiple hooks are added) causing
> the
> >> > File.list() in that last thread to take quite some time.
> >> >
> >> > While I was looking through the stacktrace the JVM finally exited
> (after
> >> > 15-20min at least) so I won't be able to debug more until this bug
> >> > strikes
> >> > again.
> >> >
> >> > Any ideas on what might be going on here?
> >> >
> >> > Thanks!
> >> > Andrew
> >
> >
>

Re: [0.9.0] Possible deadlock in shutdown hook?

Posted by Mridul Muralidharan <mr...@gmail.com>.
Looks like a pathological corner case here - where the the delete
thread is not getting run while the OS is busy prioritizing the thread
writing data (probably with heavy gc too).
Ideally, the delete thread would list files, remove them and then fail
when it tries to remove the non empty directory (since other thread
might be creating more in parallel).


Regards,
Mridul


On Thu, Feb 6, 2014 at 4:19 PM, Andrew Ash <an...@andrewash.com> wrote:
> Got a repro locally on my MBP (the other was on a CentOS machine).
>
> Build spark, run a master and a worker with the sbin/start-all.sh script,
> then run this in a shell:
>
> import org.apache.spark.storage.StorageLevel._
> val s = sc.parallelize(1 to 1000000000).persist(MEMORY_AND_DISK_SER);
> s.count
>
> After about a minute, this line appears in the shell logging output:
>
> 14/02/06 02:44:44 WARN BlockManagerMasterActor: Removing BlockManager
> BlockManagerId(0, aash-mbp.dyn.yojoe.local, 57895, 0) with no recent heart
> beats: 57510ms exceeds 45000ms
>
> Ctrl-C the shell.  In jps there is now a worker, a master, and a
> CoarseGrainedExecutorBackend.
>
> Run jstack on the CGEBackend JVM, and I got the attached stacktraces.  I
> waited around for 15min then kill -9'd the JVM and restarted the process.
>
> I wonder if what's happening here is that the threads that are spewing data
> to disk (as that parallelize and persist would do) can write to disk faster
> than the cleanup threads can delete from disk.
>
> What do you think of that theory?
>
>
> Andrew
>
>
>
> On Thu, Feb 6, 2014 at 2:30 AM, Mridul Muralidharan <mr...@gmail.com>
> wrote:
>>
>> shutdown hooks should not take 15 mins are you mentioned !
>> On the other hand, how busy was your disk when this was happening ?
>> (either due to spark or something else ?)
>>
>> It might just be that there was a lot of stuff to remove ?
>>
>> Regards,
>> Mridul
>>
>>
>> On Thu, Feb 6, 2014 at 3:50 PM, Andrew Ash <an...@andrewash.com> wrote:
>> > Hi Spark devs,
>> >
>> > Occasionally when hitting Ctrl-C in the scala spark shell on 0.9.0 one
>> > of
>> > my workers goes dead in the spark master UI.  I'm using the standalone
>> > cluster and didn't ever see this while using 0.8.0 so I think it may be
>> > a
>> > regression.
>> >
>> > When I prod on the hung CoarseGrainedExecutorBackend JVM with jstack and
>> > jmap -heap, it doesn't respond unless I add the -F force flag.  The heap
>> > isn't full, but there are some interesting bits in the jstack.  Poking
>> > around a little, I think there may be some kind of deadlock in the
>> > shutdown
>> > hooks.
>> >
>> > Below are the threads I think are most interesting:
>> >
>> > Thread 14308: (state = BLOCKED)
>> >  - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted frame)
>> >  - java.lang.Runtime.exit(int) @bci=14, line=109 (Interpreted frame)
>> >  - java.lang.System.exit(int) @bci=4, line=962 (Interpreted frame)
>> >  -
>> >
>> > org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object,
>> > scala.Function1) @bci=352, line=81 (Interpreted frame)
>> >  - akka.actor.ActorCell.receiveMessage(java.lang.Object) @bci=25,
>> > line=498
>> > (Interpreted frame)
>> >  - akka.actor.ActorCell.invoke(akka.dispatch.Envelope) @bci=39, line=456
>> > (Interpreted frame)
>> >  - akka.dispatch.Mailbox.processMailbox(int, long) @bci=24, line=237
>> > (Interpreted frame)
>> >  - akka.dispatch.Mailbox.run() @bci=20, line=219 (Interpreted frame)
>> >  - akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec()
>> > @bci=4, line=386 (Interpreted frame)
>> >  - scala.concurrent.forkjoin.ForkJoinTask.doExec() @bci=10, line=260
>> > (Compiled frame)
>> >  -
>> >
>> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask)
>> > @bci=10, line=1339 (Compiled frame)
>> >  -
>> >
>> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue)
>> > @bci=11, line=1979 (Compiled frame)
>> >  - scala.concurrent.forkjoin.ForkJoinWorkerThread.run() @bci=14,
>> > line=107
>> > (Interpreted frame)
>> >
>> > Thread 3865: (state = BLOCKED)
>> >  - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
>> >  - java.lang.Thread.join(long) @bci=38, line=1280 (Interpreted frame)
>> >  - java.lang.Thread.join() @bci=2, line=1354 (Interpreted frame)
>> >  - java.lang.ApplicationShutdownHooks.runHooks() @bci=87, line=106
>> > (Interpreted frame)
>> >  - java.lang.ApplicationShutdownHooks$1.run() @bci=0, line=46
>> > (Interpreted
>> > frame)
>> >  - java.lang.Shutdown.runHooks() @bci=39, line=123 (Interpreted frame)
>> >  - java.lang.Shutdown.sequence() @bci=26, line=167 (Interpreted frame)
>> >  - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted frame)
>> >  - java.lang.Terminator$1.handle(sun.misc.Signal) @bci=8, line=52
>> > (Interpreted frame)
>> >  - sun.misc.Signal$1.run() @bci=8, line=212 (Interpreted frame)
>> >  - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
>> >
>> >
>> > Thread 3987: (state = BLOCKED)
>> >  - java.io.UnixFileSystem.list(java.io.File) @bci=0 (Interpreted frame)
>> >  - java.io.File.list() @bci=29, line=1116 (Interpreted frame)
>> >  - java.io.File.listFiles() @bci=1, line=1201 (Compiled frame)
>> >  - org.apache.spark.util.Utils$.listFilesSafely(java.io.File) @bci=1,
>> > line=466 (Interpreted frame)
>> >  - org.apache.spark.util.Utils$.deleteRecursively(java.io.File) @bci=9,
>> > line=478 (Compiled frame)
>> >  -
>> >
>> > org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File)
>> > @bci=4, line=479 (Compiled frame)
>> >  -
>> >
>> > org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object)
>> > @bci=5, line=478 (Compiled frame)
>> >  -
>> >
>> > scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>> > scala.Function1) @bci=22, line=33 (Compiled frame)
>> >  - scala.collection.mutable.WrappedArray.foreach(scala.Function1)
>> > @bci=2,
>> > line=34 (Compiled frame)
>> >  - org.apache.spark.util.Utils$.deleteRecursively(java.io.File) @bci=19,
>> > line=478 (Interpreted frame)
>> >  -
>> >
>> > org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File)
>> > @bci=14, line=141 (Interpreted frame)
>> >  -
>> >
>> > org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object)
>> > @bci=5, line=139 (Interpreted frame)
>> >  -
>> >
>> > scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>> > scala.Function1) @bci=22, line=33 (Compiled frame)
>> >  - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
>> > @bci=2,
>> > line=108 (Interpreted frame)
>> >  - org.apache.spark.storage.DiskBlockManager$$anon$1.run() @bci=39,
>> > line=139 (Interpreted frame)
>> >
>> >
>> > I think what happened here is that thread 14308 received the akka
>> > "shutdown" message and called System.exit().  This started thread 3865,
>> > which is the JVM shutting itself down.  Part of that process is running
>> > the
>> > shutdown hooks, so it started thread 3987.  That thread is the shutdown
>> > hook from addShutdownHook() in DiskBlockManager.scala, which looks like
>> > this:
>> >
>> >   private def addShutdownHook() {
>> >     localDirs.foreach(localDir =>
>> > Utils.registerShutdownDeleteDir(localDir))
>> >     Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local
>> > dirs") {
>> >       override def run() {
>> >         logDebug("Shutdown hook called")
>> >         localDirs.foreach { localDir =>
>> >           try {
>> >             if (!Utils.hasRootAsShutdownDeleteDir(localDir))
>> > Utils.deleteRecursively(localDir)
>> >           } catch {
>> >             case t: Throwable =>
>> >               logError("Exception while deleting local spark dir: " +
>> > localDir, t)
>> >           }
>> >         }
>> >
>> >         if (shuffleSender != null) {
>> >           shuffleSender.stop()
>> >         }
>> >       }
>> >     })
>> >   }
>> >
>> > It goes through and deletes the directories recursively.  I was thinking
>> > there might be some issues with concurrently-running shutdown hooks
>> > deleting things out from underneath each other (shutdown hook javadocs
>> > say
>> > they're all started in parallel if multiple hooks are added) causing the
>> > File.list() in that last thread to take quite some time.
>> >
>> > While I was looking through the stacktrace the JVM finally exited (after
>> > 15-20min at least) so I won't be able to debug more until this bug
>> > strikes
>> > again.
>> >
>> > Any ideas on what might be going on here?
>> >
>> > Thanks!
>> > Andrew
>
>

Re: [0.9.0] Possible deadlock in shutdown hook?

Posted by Andrew Ash <an...@andrewash.com>.
Got a repro locally on my MBP (the other was on a CentOS machine).

Build spark, run a master and a worker with the sbin/start-all.sh script,
then run this in a shell:

import org.apache.spark.storage.StorageLevel._
val s = sc.parallelize(1 to 1000000000).persist(MEMORY_AND_DISK_SER);
s.count

After about a minute, this line appears in the shell logging output:

14/02/06 02:44:44 WARN BlockManagerMasterActor: Removing BlockManager
BlockManagerId(0, aash-mbp.dyn.yojoe.local, 57895, 0) with no recent heart
beats: 57510ms exceeds 45000ms

Ctrl-C the shell.  In jps there is now a worker, a master, and a
CoarseGrainedExecutorBackend.

Run jstack on the CGEBackend JVM, and I got the attached stacktraces.  I
waited around for 15min then kill -9'd the JVM and restarted the process.

I wonder if what's happening here is that the threads that are spewing data
to disk (as that parallelize and persist would do) can write to disk faster
than the cleanup threads can delete from disk.

What do you think of that theory?


Andrew



On Thu, Feb 6, 2014 at 2:30 AM, Mridul Muralidharan <mr...@gmail.com>wrote:

> shutdown hooks should not take 15 mins are you mentioned !
> On the other hand, how busy was your disk when this was happening ?
> (either due to spark or something else ?)
>
> It might just be that there was a lot of stuff to remove ?
>
> Regards,
> Mridul
>
>
> On Thu, Feb 6, 2014 at 3:50 PM, Andrew Ash <an...@andrewash.com> wrote:
> > Hi Spark devs,
> >
> > Occasionally when hitting Ctrl-C in the scala spark shell on 0.9.0 one of
> > my workers goes dead in the spark master UI.  I'm using the standalone
> > cluster and didn't ever see this while using 0.8.0 so I think it may be a
> > regression.
> >
> > When I prod on the hung CoarseGrainedExecutorBackend JVM with jstack and
> > jmap -heap, it doesn't respond unless I add the -F force flag.  The heap
> > isn't full, but there are some interesting bits in the jstack.  Poking
> > around a little, I think there may be some kind of deadlock in the
> shutdown
> > hooks.
> >
> > Below are the threads I think are most interesting:
> >
> > Thread 14308: (state = BLOCKED)
> >  - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted frame)
> >  - java.lang.Runtime.exit(int) @bci=14, line=109 (Interpreted frame)
> >  - java.lang.System.exit(int) @bci=4, line=962 (Interpreted frame)
> >  -
> >
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object,
> > scala.Function1) @bci=352, line=81 (Interpreted frame)
> >  - akka.actor.ActorCell.receiveMessage(java.lang.Object) @bci=25,
> line=498
> > (Interpreted frame)
> >  - akka.actor.ActorCell.invoke(akka.dispatch.Envelope) @bci=39, line=456
> > (Interpreted frame)
> >  - akka.dispatch.Mailbox.processMailbox(int, long) @bci=24, line=237
> > (Interpreted frame)
> >  - akka.dispatch.Mailbox.run() @bci=20, line=219 (Interpreted frame)
> >  - akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec()
> > @bci=4, line=386 (Interpreted frame)
> >  - scala.concurrent.forkjoin.ForkJoinTask.doExec() @bci=10, line=260
> > (Compiled frame)
> >  -
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask)
> > @bci=10, line=1339 (Compiled frame)
> >  -
> >
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue)
> > @bci=11, line=1979 (Compiled frame)
> >  - scala.concurrent.forkjoin.ForkJoinWorkerThread.run() @bci=14, line=107
> > (Interpreted frame)
> >
> > Thread 3865: (state = BLOCKED)
> >  - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
> >  - java.lang.Thread.join(long) @bci=38, line=1280 (Interpreted frame)
> >  - java.lang.Thread.join() @bci=2, line=1354 (Interpreted frame)
> >  - java.lang.ApplicationShutdownHooks.runHooks() @bci=87, line=106
> > (Interpreted frame)
> >  - java.lang.ApplicationShutdownHooks$1.run() @bci=0, line=46
> (Interpreted
> > frame)
> >  - java.lang.Shutdown.runHooks() @bci=39, line=123 (Interpreted frame)
> >  - java.lang.Shutdown.sequence() @bci=26, line=167 (Interpreted frame)
> >  - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted frame)
> >  - java.lang.Terminator$1.handle(sun.misc.Signal) @bci=8, line=52
> > (Interpreted frame)
> >  - sun.misc.Signal$1.run() @bci=8, line=212 (Interpreted frame)
> >  - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
> >
> >
> > Thread 3987: (state = BLOCKED)
> >  - java.io.UnixFileSystem.list(java.io.File) @bci=0 (Interpreted frame)
> >  - java.io.File.list() @bci=29, line=1116 (Interpreted frame)
> >  - java.io.File.listFiles() @bci=1, line=1201 (Compiled frame)
> >  - org.apache.spark.util.Utils$.listFilesSafely(java.io.File) @bci=1,
> > line=466 (Interpreted frame)
> >  - org.apache.spark.util.Utils$.deleteRecursively(java.io.File) @bci=9,
> > line=478 (Compiled frame)
> >  -
> >
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File)
> > @bci=4, line=479 (Compiled frame)
> >  -
> >
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object)
> > @bci=5, line=478 (Compiled frame)
> >  -
> >
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> > scala.Function1) @bci=22, line=33 (Compiled frame)
> >  - scala.collection.mutable.WrappedArray.foreach(scala.Function1) @bci=2,
> > line=34 (Compiled frame)
> >  - org.apache.spark.util.Utils$.deleteRecursively(java.io.File) @bci=19,
> > line=478 (Interpreted frame)
> >  -
> >
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File)
> > @bci=14, line=141 (Interpreted frame)
> >  -
> >
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object)
> > @bci=5, line=139 (Interpreted frame)
> >  -
> >
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> > scala.Function1) @bci=22, line=33 (Compiled frame)
> >  - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
> @bci=2,
> > line=108 (Interpreted frame)
> >  - org.apache.spark.storage.DiskBlockManager$$anon$1.run() @bci=39,
> > line=139 (Interpreted frame)
> >
> >
> > I think what happened here is that thread 14308 received the akka
> > "shutdown" message and called System.exit().  This started thread 3865,
> > which is the JVM shutting itself down.  Part of that process is running
> the
> > shutdown hooks, so it started thread 3987.  That thread is the shutdown
> > hook from addShutdownHook() in DiskBlockManager.scala, which looks like
> > this:
> >
> >   private def addShutdownHook() {
> >     localDirs.foreach(localDir =>
> Utils.registerShutdownDeleteDir(localDir))
> >     Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local
> > dirs") {
> >       override def run() {
> >         logDebug("Shutdown hook called")
> >         localDirs.foreach { localDir =>
> >           try {
> >             if (!Utils.hasRootAsShutdownDeleteDir(localDir))
> > Utils.deleteRecursively(localDir)
> >           } catch {
> >             case t: Throwable =>
> >               logError("Exception while deleting local spark dir: " +
> > localDir, t)
> >           }
> >         }
> >
> >         if (shuffleSender != null) {
> >           shuffleSender.stop()
> >         }
> >       }
> >     })
> >   }
> >
> > It goes through and deletes the directories recursively.  I was thinking
> > there might be some issues with concurrently-running shutdown hooks
> > deleting things out from underneath each other (shutdown hook javadocs
> say
> > they're all started in parallel if multiple hooks are added) causing the
> > File.list() in that last thread to take quite some time.
> >
> > While I was looking through the stacktrace the JVM finally exited (after
> > 15-20min at least) so I won't be able to debug more until this bug
> strikes
> > again.
> >
> > Any ideas on what might be going on here?
> >
> > Thanks!
> > Andrew
>

Re: [0.9.0] Possible deadlock in shutdown hook?

Posted by Mridul Muralidharan <mr...@gmail.com>.
shutdown hooks should not take 15 mins are you mentioned !
On the other hand, how busy was your disk when this was happening ?
(either due to spark or something else ?)

It might just be that there was a lot of stuff to remove ?

Regards,
Mridul


On Thu, Feb 6, 2014 at 3:50 PM, Andrew Ash <an...@andrewash.com> wrote:
> Hi Spark devs,
>
> Occasionally when hitting Ctrl-C in the scala spark shell on 0.9.0 one of
> my workers goes dead in the spark master UI.  I'm using the standalone
> cluster and didn't ever see this while using 0.8.0 so I think it may be a
> regression.
>
> When I prod on the hung CoarseGrainedExecutorBackend JVM with jstack and
> jmap -heap, it doesn't respond unless I add the -F force flag.  The heap
> isn't full, but there are some interesting bits in the jstack.  Poking
> around a little, I think there may be some kind of deadlock in the shutdown
> hooks.
>
> Below are the threads I think are most interesting:
>
> Thread 14308: (state = BLOCKED)
>  - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted frame)
>  - java.lang.Runtime.exit(int) @bci=14, line=109 (Interpreted frame)
>  - java.lang.System.exit(int) @bci=4, line=962 (Interpreted frame)
>  -
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object,
> scala.Function1) @bci=352, line=81 (Interpreted frame)
>  - akka.actor.ActorCell.receiveMessage(java.lang.Object) @bci=25, line=498
> (Interpreted frame)
>  - akka.actor.ActorCell.invoke(akka.dispatch.Envelope) @bci=39, line=456
> (Interpreted frame)
>  - akka.dispatch.Mailbox.processMailbox(int, long) @bci=24, line=237
> (Interpreted frame)
>  - akka.dispatch.Mailbox.run() @bci=20, line=219 (Interpreted frame)
>  - akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec()
> @bci=4, line=386 (Interpreted frame)
>  - scala.concurrent.forkjoin.ForkJoinTask.doExec() @bci=10, line=260
> (Compiled frame)
>  -
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask)
> @bci=10, line=1339 (Compiled frame)
>  -
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue)
> @bci=11, line=1979 (Compiled frame)
>  - scala.concurrent.forkjoin.ForkJoinWorkerThread.run() @bci=14, line=107
> (Interpreted frame)
>
> Thread 3865: (state = BLOCKED)
>  - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
>  - java.lang.Thread.join(long) @bci=38, line=1280 (Interpreted frame)
>  - java.lang.Thread.join() @bci=2, line=1354 (Interpreted frame)
>  - java.lang.ApplicationShutdownHooks.runHooks() @bci=87, line=106
> (Interpreted frame)
>  - java.lang.ApplicationShutdownHooks$1.run() @bci=0, line=46 (Interpreted
> frame)
>  - java.lang.Shutdown.runHooks() @bci=39, line=123 (Interpreted frame)
>  - java.lang.Shutdown.sequence() @bci=26, line=167 (Interpreted frame)
>  - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted frame)
>  - java.lang.Terminator$1.handle(sun.misc.Signal) @bci=8, line=52
> (Interpreted frame)
>  - sun.misc.Signal$1.run() @bci=8, line=212 (Interpreted frame)
>  - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
>
>
> Thread 3987: (state = BLOCKED)
>  - java.io.UnixFileSystem.list(java.io.File) @bci=0 (Interpreted frame)
>  - java.io.File.list() @bci=29, line=1116 (Interpreted frame)
>  - java.io.File.listFiles() @bci=1, line=1201 (Compiled frame)
>  - org.apache.spark.util.Utils$.listFilesSafely(java.io.File) @bci=1,
> line=466 (Interpreted frame)
>  - org.apache.spark.util.Utils$.deleteRecursively(java.io.File) @bci=9,
> line=478 (Compiled frame)
>  -
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File)
> @bci=4, line=479 (Compiled frame)
>  -
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object)
> @bci=5, line=478 (Compiled frame)
>  -
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> scala.Function1) @bci=22, line=33 (Compiled frame)
>  - scala.collection.mutable.WrappedArray.foreach(scala.Function1) @bci=2,
> line=34 (Compiled frame)
>  - org.apache.spark.util.Utils$.deleteRecursively(java.io.File) @bci=19,
> line=478 (Interpreted frame)
>  -
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File)
> @bci=14, line=141 (Interpreted frame)
>  -
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object)
> @bci=5, line=139 (Interpreted frame)
>  -
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> scala.Function1) @bci=22, line=33 (Compiled frame)
>  - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1) @bci=2,
> line=108 (Interpreted frame)
>  - org.apache.spark.storage.DiskBlockManager$$anon$1.run() @bci=39,
> line=139 (Interpreted frame)
>
>
> I think what happened here is that thread 14308 received the akka
> "shutdown" message and called System.exit().  This started thread 3865,
> which is the JVM shutting itself down.  Part of that process is running the
> shutdown hooks, so it started thread 3987.  That thread is the shutdown
> hook from addShutdownHook() in DiskBlockManager.scala, which looks like
> this:
>
>   private def addShutdownHook() {
>     localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir))
>     Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local
> dirs") {
>       override def run() {
>         logDebug("Shutdown hook called")
>         localDirs.foreach { localDir =>
>           try {
>             if (!Utils.hasRootAsShutdownDeleteDir(localDir))
> Utils.deleteRecursively(localDir)
>           } catch {
>             case t: Throwable =>
>               logError("Exception while deleting local spark dir: " +
> localDir, t)
>           }
>         }
>
>         if (shuffleSender != null) {
>           shuffleSender.stop()
>         }
>       }
>     })
>   }
>
> It goes through and deletes the directories recursively.  I was thinking
> there might be some issues with concurrently-running shutdown hooks
> deleting things out from underneath each other (shutdown hook javadocs say
> they're all started in parallel if multiple hooks are added) causing the
> File.list() in that last thread to take quite some time.
>
> While I was looking through the stacktrace the JVM finally exited (after
> 15-20min at least) so I won't be able to debug more until this bug strikes
> again.
>
> Any ideas on what might be going on here?
>
> Thanks!
> Andrew