You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Ilayaperumal Gopinathan (JIRA)" <ji...@apache.org> on 2014/12/31 11:30:13 UTC

[jira] [Comment Edited] (SPARK-2892) Socket Receiver does not stop when streaming context is stopped

    [ https://issues.apache.org/jira/browse/SPARK-2892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14262078#comment-14262078 ] 

Ilayaperumal Gopinathan edited comment on SPARK-2892 at 12/31/14 10:29 AM:
---------------------------------------------------------------------------

[~joshrosen] yeah, serialization could be the real issue here.
 But after trying the suggested change (in my environment where I call streamingContext stop). But I see the active streaming job is only cancelled upon spark context shutdown), I see the below:

ERROR sparkDriver-akka.actor.default-dispatcher-19 scheduler.TaskSchedulerImpl - Lost executor 0 on 192.168.2.6: remote Akka client disassociated
WARN sparkDriver-akka.actor.default-dispatcher-20 remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://sparkExecutor@192.168.2.6:54259] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
WARN sparkDriver-akka.actor.default-dispatcher-19 scheduler.TaskSetManager - Lost task 0.0 in stage 8.0 (TID 76, 192.168.2.6): ExecutorLostFailure (executor 0 lost)
 ERROR sparkDriver-akka.actor.default-dispatcher-19 cluster.SparkDeploySchedulerBackend - Asked to remove non-existent executor 0
ERROR sparkDriver-akka.actor.default-dispatcher-19 cluster.SparkDeploySchedulerBackend - Asked to remove non-existent executor 0
WARN main-EventThread scheduler.ReceiverTracker - All of the receivers have not deregistered, Map(0 -> ReceiverInfo(0,MessageBusReceiver-0,Actor[akka.tcp://sparkExecutor@192.168.2.6:54279/user/Receiver-0-1420019455242#1553271505],true,192.168.2.6,,))
Exception in thread "Thread-42" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:701)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:701)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1428)
	at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundPostStop(DAGScheduler.scala:1375)
	at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
	at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
	at akka.actor.ActorCell.terminate(ActorCell.scala:369)
	at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
	at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
	at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
	at akka.dispatch.Mailbox.run(Mailbox.scala:219)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Let me know what do you think?


was (Author: ilayaperumalg):
[~joshrosen] yeah, serialization could be the real issue here.
 But after trying the suggested change (in my environment where I call streamingContext stop). But I see the active streaming job is only cancelled upon spark context shutdown), I see the below:

ERROR akka.actor.default-dispatcher-19 scheduler.TaskSchedulerImpl - Lost executor 0 on 192.168.2.6: remote Akka client disassociated
WARN akka.actor.default-dispatcher-20 remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://sparkExecutor@192.168.2.6:54259] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
WARN akka.actor.default-dispatcher-19 scheduler.TaskSetManager - Lost task 0.0 in stage 8.0 (TID 76, 192.168.2.6): ExecutorLostFailure (executor 0 lost)
 ERROR akka.actor.default-dispatcher-19 cluster.SparkDeploySchedulerBackend - Asked to remove non-existent executor 0
ERROR akka.actor.default-dispatcher-19 cluster.SparkDeploySchedulerBackend - Asked to remove non-existent executor 0
WARN main-EventThread scheduler.ReceiverTracker - All of the receivers have not deregistered, Map(0 -> ReceiverInfo(0,MessageBusReceiver-0,Actor[akka.tcp://sparkExecutor@192.168.2.6:54279/user/Receiver-0-1420019455242#1553271505],true,192.168.2.6,,))
Exception in thread "Thread-42" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:701)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:701)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1428)
	at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundPostStop(DAGScheduler.scala:1375)
	at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
	at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
	at akka.actor.ActorCell.terminate(ActorCell.scala:369)
	at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
	at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
	at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
	at akka.dispatch.Mailbox.run(Mailbox.scala:219)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Let me know what do you think?

> Socket Receiver does not stop when streaming context is stopped
> ---------------------------------------------------------------
>
>                 Key: SPARK-2892
>                 URL: https://issues.apache.org/jira/browse/SPARK-2892
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.0.2
>            Reporter: Tathagata Das
>            Assignee: Tathagata Das
>            Priority: Critical
>
> Running NetworkWordCount with
> {quote}      
> ssc.start(); Thread.sleep(10000); ssc.stop(stopSparkContext = false); Thread.sleep(60000)
> {quote}
> gives the following error
> {quote}
> 14/08/06 18:37:13 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 10047 ms on localhost (1/1)
> 14/08/06 18:37:13 INFO DAGScheduler: Stage 0 (runJob at ReceiverTracker.scala:275) finished in 10.056 s
> 14/08/06 18:37:13 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
> 14/08/06 18:37:13 INFO SparkContext: Job finished: runJob at ReceiverTracker.scala:275, took 10.179263 s
> 14/08/06 18:37:13 INFO ReceiverTracker: All of the receivers have been terminated
> 14/08/06 18:37:13 WARN ReceiverTracker: All of the receivers have not deregistered, Map(0 -> ReceiverInfo(0,SocketReceiver-0,null,false,localhost,Stopped by driver,))
> 14/08/06 18:37:13 INFO ReceiverTracker: ReceiverTracker stopped
> 14/08/06 18:37:13 INFO JobGenerator: Stopping JobGenerator immediately
> 14/08/06 18:37:13 INFO RecurringTimer: Stopped timer for JobGenerator after time 1407375433000
> 14/08/06 18:37:13 INFO JobGenerator: Stopped JobGenerator
> 14/08/06 18:37:13 INFO JobScheduler: Stopped JobScheduler
> 14/08/06 18:37:13 INFO StreamingContext: StreamingContext stopped successfully
> 14/08/06 18:37:43 INFO SocketReceiver: Stopped receiving
> 14/08/06 18:37:43 INFO SocketReceiver: Closed socket to localhost:9999
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org