You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/09/03 18:01:36 UTC

[spark] branch branch-2.4 updated: [SPARK-22955][DSTREAMS] - graceful shutdown shouldn't lead to job gen…

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 1a5858f  [SPARK-22955][DSTREAMS] - graceful shutdown shouldn't lead to job gen…
1a5858f is described below

commit 1a5858f1b0be56a18832e68276c427d7a2c1808a
Author: Nikita Gorbachevsky <ni...@playtika.com>
AuthorDate: Mon Aug 26 21:42:20 2019 -0500

    [SPARK-22955][DSTREAMS] - graceful shutdown shouldn't lead to job gen…
    
    ### What changes were proposed in this pull request?
    During graceful shutdown of ``StreamingContext`` ``graph.stop()`` is invoked right after stopping of ``timer`` which generates new job. Thus it's possible that the latest jobs generated by timer are still in the middle of generation but invocation of ``graph.stop()`` closes some objects required to job generation, e.g. consumer for Kafka, and generation fails. That also leads to fully waiting of ``spark.streaming.gracefulStopTimeout`` which is equal to 10 batch intervals by default. S [...]
    
    ### How was this patch tested?
    Added test to existing test suite.
    
    Closes #25511 from choojoyq/SPARK-22955-job-generation-error-on-graceful-stop.
    
    Authored-by: Nikita Gorbachevsky <ni...@playtika.com>
    Signed-off-by: Sean Owen <se...@databricks.com>
---
 .../spark/streaming/scheduler/JobGenerator.scala   |  2 +-
 .../spark/streaming/StreamingContextSuite.scala    | 30 ++++++++++++++++++++--
 2 files changed, 29 insertions(+), 3 deletions(-)

diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 8d83dc8..124baf1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -135,7 +135,6 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
 
       // Stop generating jobs
       val stopTime = timer.stop(interruptTimer = false)
-      graph.stop()
       logInfo("Stopped generation timer")
 
       // Wait for the jobs to complete and checkpoints to be written
@@ -147,6 +146,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
         Thread.sleep(pollTime)
       }
       logInfo("Waited for jobs to be processed and checkpoints to be written")
+      graph.stop()
     } else {
       logInfo("Stopping JobGenerator immediately")
       // Stop timer and graph immediately, ignore unprocessed data and pending jobs
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 52c8959..2652ed2 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -36,10 +36,11 @@ import org.apache.spark._
 import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.metrics.source.Source
+import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.dstream.{DStream, InputDStream}
 import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ManualClock, Utils}
 
 
 class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeLimits with Logging {
@@ -841,6 +842,31 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
     assert(latch.await(60, TimeUnit.SECONDS))
   }
 
+  test("SPARK-22955 graceful shutdown shouldn't lead to job generation error") {
+    val conf = new SparkConf().setMaster(master).setAppName(appName)
+    conf.set("spark.streaming.clock", classOf[ManualClock].getName)
+    conf.set("spark.streaming.gracefulStopTimeout", "60s")
+
+    ssc = new StreamingContext(conf, Milliseconds(100))
+
+    new InputDStream[Int](ssc) {
+      @volatile private var stopped = false
+      override def start(): Unit = {}
+      override def stop(): Unit = stopped = true
+      override def compute(validTime: Time): Option[RDD[Int]] = {
+        if (stopped) throw new IllegalStateException("Already stopped")
+        Some(ssc.sc.emptyRDD[Int])
+      }
+    }.register()
+
+    ssc.start()
+    // start generating of batches constantly without any delay
+    ssc.scheduler.clock.asInstanceOf[ManualClock].setTime(Long.MaxValue)
+    ssc.stop(stopSparkContext = true, stopGracefully = true)
+    // exception shouldn't be thrown
+    ssc.awaitTermination()
+  }
+
   def addInputStream(s: StreamingContext): DStream[Int] = {
     val input = (1 to 100).map(i => 1 to i)
     val inputStream = new TestInputStream(s, input, 1)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org