You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Josh Rosen (JIRA)" <ji...@apache.org> on 2015/06/22 21:01:00 UTC

[jira] [Commented] (SPARK-7563) OutputCommitCoordinator.stop() should only be executed in driver

    [ https://issues.apache.org/jira/browse/SPARK-7563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14596449#comment-14596449 ] 

Josh Rosen commented on SPARK-7563:
-----------------------------------

This still needs a backport.  I'll do it eventually but it would be nice if someone else could submit a PR; I'll review.

> OutputCommitCoordinator.stop() should only be executed in driver
> ----------------------------------------------------------------
>
>                 Key: SPARK-7563
>                 URL: https://issues.apache.org/jira/browse/SPARK-7563
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.3.1
>         Environment: Red Hat Enterprise Linux Server release 7.0 (Maipo)
> Spark 1.3.1 Release
>            Reporter: Hailong Wen
>            Priority: Critical
>              Labels: backport-needed
>             Fix For: 1.4.0
>
>
> I am from IBM Platform Symphony team and we are integrating Spark 1.3.1 with EGO (a resource management product).
> In EGO we uses fine-grained dynamic allocation policy, and each Executor will exit after its tasks are all done. When testing *spark-shell*, we find that when executor of first job exit, it will stop OutputCommitCoordinator, which result in all future jobs failing. Details are as follows:
> We got the following error in executor when submitting job in *spark-shell* the second time (the first job submission is successful):
> {noformat}
> 15/05/11 04:02:31 INFO spark.util.AkkaUtils: Connecting to OutputCommitCoordinator: akka.tcp://sparkDriver@whlspark01:50452/user/OutputCommitCoordinator
> Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://sparkDriver@whlspark01:50452/), Path(/user/OutputCommitCoordinator)]
>         at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
>         at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
>         at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>         at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
>         at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
>         at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>         at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>         at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>         at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
>         at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
>         at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
>         at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
>         at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>         at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>         at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
>         at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:89)
>         at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:937)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>         at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>         at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {noformat}
> And in driver side, we see a log message telling that the OutputCommitCoordinator is stopped after the first submission:
> {noformat}
> 15/05/11 04:01:23 INFO spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorActor: OutputCommitCoordinator stopped!
> {noformat}
> We examine the code of OutputCommitCoordinator, and find that executor will reuse the ref of driver's OutputCommitCoordinatorActor. So when an executor exits, it will eventually call SparkEnv.stop():
> {noformat}
>   private[spark] def stop() {
>     isStopped = true
>     pythonWorkers.foreach { case(key, worker) => worker.stop() }
>     Option(httpFileServer).foreach(_.stop())
>     mapOutputTracker.stop()
>     shuffleManager.stop()
>     broadcastManager.stop()
>     blockManager.stop()
>     blockManager.master.stop()
>     metricsSystem.stop()
>     outputCommitCoordinator.stop()      <--------------- 
>     actorSystem.shutdown()
>     ......
> {noformat} 
> and in OutputCommitCoordinator.stop():
> {noformat}
>   def stop(): Unit = synchronized {
>     coordinatorActor.foreach(_ ! StopCoordinator)
>     coordinatorActor = None
>     authorizedCommittersByStage.clear()
>   }
> {noformat}
> We now work this problem around by adding an attribute "isDriver" in OutputCommitCoordinator and judge whether the "stop" command comes from driver or executor:
> {noformat}
> diff SparkEnv.scala
> 360c360
> <       new OutputCommitCoordinator(conf, isDriver)
> ---
> >       new OutputCommitCoordinator(conf)
> diff OutputCommitCoordinator.scala
> 43c43
> < private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean = false) extends Logging {
> ---
> > private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
> 137,141c137,139
> <     if (isDriver) {
> <       coordinatorActor.foreach(_ ! StopCoordinator)
> <       coordinatorActor = None
> <       authorizedCommittersByStage.clear()
> <     }
> ---
> >     coordinatorActor.foreach(_ ! StopCoordinator)
> >     coordinatorActor = None
> >     authorizedCommittersByStage.clear()
> {noformat}
> We propose to apply this fix in future release since it may affects all *spark-shell* function of dynamic allocation model.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org