You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:16:54 UTC

[jira] [Resolved] (SPARK-22679) It's slow to stop streaming context

     [ https://issues.apache.org/jira/browse/SPARK-22679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-22679.
----------------------------------
    Resolution: Incomplete

> It's slow to stop streaming context
> -----------------------------------
>
>                 Key: SPARK-22679
>                 URL: https://issues.apache.org/jira/browse/SPARK-22679
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 2.1.1
>            Reporter: wuyonghua
>            Priority: Minor
>              Labels: bulk-closed
>
> Attached a simple program to reproduce the issue.
> class QueueDStream[T: scala.reflect.ClassTag](
>     @transient ssc: org.apache.spark.streaming.StreamingContext,
>     val queue: scala.collection.mutable.Queue[org.apache.spark.rdd.RDD[T]],
>     defaultRDD: org.apache.spark.rdd.RDD[T]
>   ) extends org.apache.spark.streaming.dstream.InputDStream[T](ssc) {
>   override def start() { }
>   override def stop() { }
>   
>   override def compute(validTime: org.apache.spark.streaming.Time): Option[org.apache.spark.rdd.RDD[T]] = {
>     val buffer = new scala.collection.mutable.ArrayBuffer[org.apache.spark.rdd.RDD[T]]()
>     if (queue.size > 0) {
>       buffer += queue.dequeue()
>     }
>     if (buffer.size > 0) {
>         Some(buffer.head)
>     } else if (defaultRDD != null) {
>       Some(defaultRDD)
>     } else {
>       None
>     }
>   }
> }
> def main() {
>     val accum = sc.accumulator(0, "End Accumulator")
>     println(">>> create streamingContext.")
>     val ssc = new org.apache.spark.streaming.StreamingContext(sc, org.apache.spark.streaming.Milliseconds(10000))
>     val endMarkerToken:String = "_END_"
>     val endMakerRDD = sc.makeRDD(Array(endMarkerToken), 1)
>     def _endMarkerFilter(source1 : org.apache.spark.streaming.dstream.DStream[String]) : org.apache.spark.streaming.dstream.DStream[String] = {
>         val retval = source1.filter(line => line.contains("_END_")==false)
>         retval
>     }
>     def func() : org.apache.spark.streaming.dstream.DStream[String] = {
>         // Simulate Stream... use full package name for Queue to avoid potential conflict with java.util.Queue
>         var data = Array(
>         "count,first_location_id,last_location_id",
>         "2000-01-01 01:00:00.1,a,1",
>         "2000-01-01 01:00:00.0,a,2"
>         )
>         val queue = scala.collection.mutable.Queue(sc.parallelize(data))
>         val textSource = new QueueDStream(ssc, queue, endMakerRDD)
>         textSource.foreachRDD(rdd => { rdd.foreach( item => {if (item == "_END_") {accum += 1}} ) })
>         val retval = _endMarkerFilter(textSource)
>         retval
>     }
>     val __1 = func()
>     print(__1.print)
>     println(">>> Start streaming context.")
>     ssc.start()
>     import scala.concurrent.ExecutionContext.Implicits.global
>     val stopFunc = scala.concurrent.Future {var isRun = true; var duration = 0; while (isRun) { Thread.sleep(1000); duration += 1; if (accum.value > 0 || duration >= 20) {println("### STOP SSC ###");ssc.stop(false, true); duration = 0; isRun = false} }}
>     ssc.awaitTermination()
>     println(">>> Streaming context terminated.")
> }
> -----------
> execute main. It cost 4 batches (at least 40 seconds) to finish.
> But, if adding a stop check in org.apache.spark.streaming.util.RecurringTimer::triggerActionForNextInterval as below
> private def triggerActionForNextInterval(): Unit = {
>       clock.waitTillTime(nextTime)
>       if (!stopped) {
>         callback(nextTime)
>         prevTime = nextTime
>         nextTime += period
>         logDebug("Callback for " + name + " called at time " + prevTime)
>       }
> }
> then, streaming context can be stopped after only two batches.
> In addition, if the batch interval is small, e.g. 0.1 second, it needs more time to stop streaming context gracefully.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org