You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ian Ferreira <ia...@hotmail.com> on 2014/04/28 17:38:29 UTC

Running parallel jobs in the same driver with Futures?

I recall asking about this, and I think Matei suggest it was,  but is the
scheduler thread safe?

I am running mllib libraries as futures in the same driver using the same
dataset as input and this error

14/04/28 08:29:48 ERROR TaskSchedulerImpl: Exception in statusUpdate
java.util.concurrent.RejectedExecutionException: Task
org.apache.spark.scheduler.TaskResultGetter$$anon$2@75974c8b rejected from
java.util.concurrent.ThreadPoolExecutor@202275fd[Terminated, pool size = 1,
active threads = 0, queued tasks = 0, completed tasks = 622]
at 
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Thread
PoolExecutor.java:2048)
at 
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
at 
java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372
)
at 
org.apache.spark.scheduler.TaskResultGetter.enqueueSuccessfulTask(TaskResult
Getter.scala:46)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$liftedTree1$1$1.apply(
TaskSchedulerImpl.scala:269)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$liftedTree1$1$1.apply(
TaskSchedulerImpl.scala:266)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.liftedTree1$1(TaskSchedulerImpl
.scala:266)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.statusUpdate(TaskSchedulerImpl.
scala:251)
at 
org.apache.spark.scheduler.local.LocalActor$$anonfun$receive$1.applyOrElse(L
ocalBackend.scala:58)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDis
patcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1
339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java
:107)


Pseudo code


val a = Future { doSomething}
val b = Future { doSomething }

val numbers = List(a,b)
val futNumbers = Future.sequence(numbers)

futNumbers.map { listOfNumbers => /* your handling code here */ }

Await.result(futNumbers, 600 seconds)