You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by vonnagy <iv...@vadio.com> on 2015/11/07 21:17:58 UTC

Calling stop on StreamingContext locks up

If I have a streaming job (Spark 1.5.1) and attempt to stop the stream after
the first batch, the system locks up and never completes. The pseudo code
below shows that after the batch complete notification is called the stream
is stopped. I have traced the lockup to the call `listener.stop()`in
JobScheduler (line 114) which attempts to join the thread in
AsynchronousListenerBus. That thread never ends because it is still getting
messages `SparkListenerExecutorMetricsUpdate` from the DAGScheduler. The
thread never ends because the events continue to come in.

Any thoughts/ideas on how I can effectively stop the stream after the first
batch would greatly appreciated.

Psuedo Example:

class SomeJob {

    val ssc = createStreamingContext()
    val listener = new MyListener(ssc)
    ssc.addStreamingListener(listener)

    val stream = getStream

    stream.foreachRDD { rdd =>
        // Do something with the data
    }
}

class MyListener(ctx: StreamingContext) extends StreamingListener {
    override def onBatchCompleted(batchCompleted:
StreamingListenerBatchCompleted) = synchronized {
        ctx.stop(false, false)
        // NOTE: I get the same results with ctx.stop(), ctx.stop(true),
ctx.stop(true, true), or ctx.stop(false, false)
    }
}



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Calling-stop-on-StreamingContext-locks-up-tp15063.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: Calling stop on StreamingContext locks up

Posted by vonnagy <iv...@vadio.com>.
Hi Ted,

Your fix addresses the issue for me. Thanks again for your help and I saw
the PR you submitted to Master.

Ivan



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Calling-stop-on-StreamingContext-locks-up-tp15063p15073.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: Calling stop on StreamingContext locks up

Posted by Ted Yu <yu...@gmail.com>.
Would the following change work for you ?

diff --git
a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
index 61b5a4c..c330d25 100644
---
a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
+++
b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
@@ -66,6 +66,7 @@ private[spark] abstract class AsynchronousListenerBus[L
<: AnyRef, E](name: Stri
         self.synchronized {
           processingEvent = true
         }
+        if (stopped.get()) return
         try {
           val event = eventQueue.poll
           if (event == null) {

On Sat, Nov 7, 2015 at 12:17 PM, vonnagy <iv...@vadio.com> wrote:

> If I have a streaming job (Spark 1.5.1) and attempt to stop the stream
> after
> the first batch, the system locks up and never completes. The pseudo code
> below shows that after the batch complete notification is called the stream
> is stopped. I have traced the lockup to the call `listener.stop()`in
> JobScheduler (line 114) which attempts to join the thread in
> AsynchronousListenerBus. That thread never ends because it is still getting
> messages `SparkListenerExecutorMetricsUpdate` from the DAGScheduler. The
> thread never ends because the events continue to come in.
>
> Any thoughts/ideas on how I can effectively stop the stream after the first
> batch would greatly appreciated.
>
> Psuedo Example:
>
> class SomeJob {
>
>     val ssc = createStreamingContext()
>     val listener = new MyListener(ssc)
>     ssc.addStreamingListener(listener)
>
>     val stream = getStream
>
>     stream.foreachRDD { rdd =>
>         // Do something with the data
>     }
> }
>
> class MyListener(ctx: StreamingContext) extends StreamingListener {
>     override def onBatchCompleted(batchCompleted:
> StreamingListenerBatchCompleted) = synchronized {
>         ctx.stop(false, false)
>         // NOTE: I get the same results with ctx.stop(), ctx.stop(true),
> ctx.stop(true, true), or ctx.stop(false, false)
>     }
> }
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Calling-stop-on-StreamingContext-locks-up-tp15063.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> For additional commands, e-mail: dev-help@spark.apache.org
>
>