You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "yuemeng (JIRA)" <ji...@apache.org> on 2015/06/26 16:52:04 UTC

[jira] [Commented] (SPARK-8663) Dirver will be hang if there is a job submit during SparkContex stop Interval

    [ https://issues.apache.org/jira/browse/SPARK-8663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14602987#comment-14602987 ] 

yuemeng commented on SPARK-8663:
--------------------------------

the driver log like:
15/06/25 23:16:16 INFO DAGScheduler: Executor lost: 1 (epoch 1)
15/06/25 23:16:16 INFO BlockManagerMasterActor: Trying to remove executor 1 from BlockManagerMaster.
15/06/25 23:16:16 INFO BlockManagerMasterActor: Removing block manager BlockManagerId(1, 9.96.1.223, 23577)
15/06/25 23:16:16 INFO BlockManagerMaster: Removed 1 successfully in removeExecutor
15/06/25 23:16:45 ERROR ContextCleaner: Error cleaning broadcast 3512
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)
        at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:137)
        at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:227)
        at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
        at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66)
        at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:199)
        at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:159)
        at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:150)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:150)
        at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:144)
        at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:144)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1550)
        at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:143)
        at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:65)
15/06/25 23:16:45 INFO DAGScheduler: Stopping DAGScheduler
15/06/25 23:16:45 INFO YarnClientSchedulerBackend: Shutting down all executors
15/06/25 23:16:45 INFO YarnClientSchedulerBackend: Asking each executor to shut down
15/06/25 23:16:45 INFO DAGScheduler: Job 3555 failed: count at <console>:18, took 29.811052 s
15/06/25 23:16:45 INFO DAGScheduler: Job 3539 failed: count at <console>:18, took 30.089501 s
15/06/25 23:16:45 INFO DAGScheduler: Job 3553 failed: count at <console>:18, took 29.842839 s
15/06/25 23:16:45 WARN BlockManagerMaster: Failed to remove broadcast 3512 with removeFromMaster = true - Ask timed out on [Actor[akka.tcp://sparkExecutor@DS-222:23604/user/BlockManagerActor1#1981879442]] after [30000 ms]}
calcFunc start
calcFunc start
15/06/25 23:16:45 INFO DAGScheduler: Job 3554 failed: count at <console>:18, took 29.827635 s
15/06/25 23:16:45 INFO SparkContext: Starting job: count at <console>:18
15/06/25 23:16:45 INFO SparkContext: Starting job: count at <console>:18
15/06/25 23:16:45 INFO YarnClientSchedulerBackend: Stopped
15/06/25 23:16:45 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkYarnAM@DS-222:23129]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: DS-222/9.96.1.222:23129
15/06/25 23:16:46 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped!
15/06/25 23:16:46 INFO MemoryStore: MemoryStore cleared
15/06/25 23:16:46 INFO BlockManager: BlockManager stopped
15/06/25 23:16:46 INFO BlockManagerMaster: BlockManagerMaster stopped
15/06/25 23:16:46 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
15/06/25 23:16:46 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
15/06/25 23:16:46 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
15/06/25 23:16:46 INFO SparkContext: Successfully stopped SparkContext
and the driver Thread dump like:
"ForkJoinPool-3-worker-3" daemon prio=10 tid=0x0000000000991000 nid=0x3dab waiting on condition [0x00007fc9507dd000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000000fe9ea670> (a scala.concurrent.forkjoin.ForkJoinPool)
	at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"DestroyJavaVM" prio=10 tid=0x00007fc94428f000 nid=0x2597 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Thread-59" prio=10 tid=0x00007fc9444bf000 nid=0x3917 in Object.wait() [0x00007fc942b79000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	- waiting on <0x00000000f8569b00> (a org.apache.spark.scheduler.JobWaiter)
	at java.lang.Object.wait(Object.java:503)
	at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
	- locked <0x00000000f8569b00> (a org.apache.spark.scheduler.JobWaiter)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:514)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1285)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1303)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1317)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1331)
	at org.apache.spark.rdd.RDD.count(RDD.scala:910)
	at com.huawei.spark.test.DriverThread$$anon$1.run(DriverThread.scala:28)




> Dirver will be hang if there is a job submit during SparkContex stop Interval
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-8663
>                 URL: https://issues.apache.org/jira/browse/SPARK-8663
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.0.0, 1.1.1, 1.2.0
>         Environment: SUSE Linux Enterprise Server 11 SP3  (x86_64
>            Reporter: yuemeng
>             Fix For: 1.0.0, 1.1.1, 1.2.2
>
>
> Driver process will be hang if a job had submit during sc.stop Interval.This interval mean from start stop SparkContext to finish .
> The probability of this situation is very small,but If present, will cause driver process never exit.
> Reproduce step:
> 1)modify source code to make SparkContext stop() method sleep 2s
> in my situation,i make DAGScheduler stop method sleep 2s
> 2)submit an application ,code like:
> object DriverThreadTest {
>   def main(args: Array[String]) {
>     val sconf = new SparkConf().setAppName("TestJobWaitor")
>     val sc= new SparkContext(sconf)
>     Thread.sleep(5000)
>     val t = new Thread {
>       override def run() {
>         while (true) {
>           try {
>             val rdd = sc.parallelize( 1 to 1000)
>             var i = 0
>             println("calcfunc start")
>             while ( i < 10){
>               i+=1
>               rdd.count
>             }
>             println("calcfunc end")
>           }catch{
>             case e: Exception =>
>               e.printStackTrace()
>           }
>         }
>       }
>     }
>     
>     t.start()
>     
>     val t2 = new Thread {
>       override def run() {
>         Thread.sleep(2000)
>         println("stop sc thread")
>         sc.stop()
>         println("sc already stoped")
>       }
>     }
>     t2.start()
>   }
> }
> driver will be never exit



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org