You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:16:54 UTC
[jira] [Resolved] (SPARK-22679) It's slow to stop streaming context
[ https://issues.apache.org/jira/browse/SPARK-22679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-22679.
----------------------------------
Resolution: Incomplete
> It's slow to stop streaming context
> -----------------------------------
>
> Key: SPARK-22679
> URL: https://issues.apache.org/jira/browse/SPARK-22679
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core
> Affects Versions: 2.1.1
> Reporter: wuyonghua
> Priority: Minor
> Labels: bulk-closed
>
> Attached a simple program to reproduce the issue.
> class QueueDStream[T: scala.reflect.ClassTag](
> @transient ssc: org.apache.spark.streaming.StreamingContext,
> val queue: scala.collection.mutable.Queue[org.apache.spark.rdd.RDD[T]],
> defaultRDD: org.apache.spark.rdd.RDD[T]
> ) extends org.apache.spark.streaming.dstream.InputDStream[T](ssc) {
> override def start() { }
> override def stop() { }
>
> override def compute(validTime: org.apache.spark.streaming.Time): Option[org.apache.spark.rdd.RDD[T]] = {
> val buffer = new scala.collection.mutable.ArrayBuffer[org.apache.spark.rdd.RDD[T]]()
> if (queue.size > 0) {
> buffer += queue.dequeue()
> }
> if (buffer.size > 0) {
> Some(buffer.head)
> } else if (defaultRDD != null) {
> Some(defaultRDD)
> } else {
> None
> }
> }
> }
> def main() {
> val accum = sc.accumulator(0, "End Accumulator")
> println(">>> create streamingContext.")
> val ssc = new org.apache.spark.streaming.StreamingContext(sc, org.apache.spark.streaming.Milliseconds(10000))
> val endMarkerToken:String = "_END_"
> val endMakerRDD = sc.makeRDD(Array(endMarkerToken), 1)
> def _endMarkerFilter(source1 : org.apache.spark.streaming.dstream.DStream[String]) : org.apache.spark.streaming.dstream.DStream[String] = {
> val retval = source1.filter(line => line.contains("_END_")==false)
> retval
> }
> def func() : org.apache.spark.streaming.dstream.DStream[String] = {
> // Simulate Stream... use full package name for Queue to avoid potential conflict with java.util.Queue
> var data = Array(
> "count,first_location_id,last_location_id",
> "2000-01-01 01:00:00.1,a,1",
> "2000-01-01 01:00:00.0,a,2"
> )
> val queue = scala.collection.mutable.Queue(sc.parallelize(data))
> val textSource = new QueueDStream(ssc, queue, endMakerRDD)
> textSource.foreachRDD(rdd => { rdd.foreach( item => {if (item == "_END_") {accum += 1}} ) })
> val retval = _endMarkerFilter(textSource)
> retval
> }
> val __1 = func()
> print(__1.print)
> println(">>> Start streaming context.")
> ssc.start()
> import scala.concurrent.ExecutionContext.Implicits.global
> val stopFunc = scala.concurrent.Future {var isRun = true; var duration = 0; while (isRun) { Thread.sleep(1000); duration += 1; if (accum.value > 0 || duration >= 20) {println("### STOP SSC ###");ssc.stop(false, true); duration = 0; isRun = false} }}
> ssc.awaitTermination()
> println(">>> Streaming context terminated.")
> }
> -----------
> execute main. It cost 4 batches (at least 40 seconds) to finish.
> But, if adding a stop check in org.apache.spark.streaming.util.RecurringTimer::triggerActionForNextInterval as below
> private def triggerActionForNextInterval(): Unit = {
> clock.waitTillTime(nextTime)
> if (!stopped) {
> callback(nextTime)
> prevTime = nextTime
> nextTime += period
> logDebug("Callback for " + name + " called at time " + prevTime)
> }
> }
> then, streaming context can be stopped after only two batches.
> In addition, if the batch interval is small, e.g. 0.1 second, it needs more time to stop streaming context gracefully.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org