You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by tdas <gi...@git.apache.org> on 2014/03/27 03:58:11 UTC

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/247

    [SPARK-1331] Added graceful shutdown to Spark Streaming

    Current version of StreamingContext.stop() directly kills all the data receivers (NetworkReceiver) without waiting for the data already received to be persisted and processed. This PR provides the fix. Now, when the StreamingContext.stop() is called, the following sequence of steps will happen.
    1. The driver will send a stop signal to all the active receivers.
    2. Each receiver, when it gets a stop signal from the driver, first stop receiving more data, then waits for the thread that persists data blocks to BlockManager to finish persisting all receive data, and finally quits.
    3. After all the receivers have stopped, the driver will wait for the Job Generator and Job Scheduler to finish processing all the received data.
    
    It also fixes the semantics of StreamingContext.start and stop. It will throw appropriate errors and warnings if stop() is called before start(), stop() is called twice, etc. 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark graceful-shutdown

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/247.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #247
    
----
commit c43b8ae52bc0fad5e6d742800f68fde436dff5bf
Author: Tathagata Das <ta...@gmail.com>
Date:   2014-02-06T09:48:28Z

    Added graceful shutdown to Spark Streaming.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-39545197
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13756/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11227467
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -158,6 +158,15 @@ class StreamingContext private[streaming] (
     
       private[streaming] val waiter = new ContextWaiter
     
    +  /** Enumeration to identify current state of the StreamingContext */
    +  private[streaming] object ContextState extends Enumeration {
    --- End diff --
    
    minor: but how would you feel about calling this `StreamingContextState`? I found it a bit confusing when looking at it directly. Then you could probably even remove the doc because it would be totally clear.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-39541530
  
     Build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11367807
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala ---
    @@ -17,20 +17,16 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import org.apache.spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver}
    -import org.apache.spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError}
    -import org.apache.spark.{SparkException, Logging, SparkEnv}
    -import org.apache.spark.SparkContext._
    -
    -import scala.collection.mutable.HashMap
    -import scala.collection.mutable.Queue
    -import scala.concurrent.duration._
    +import scala.collection.mutable.{HashMap, SynchronizedMap, Queue}
     
     import akka.actor._
    -import akka.pattern.ask
    -import akka.dispatch._
    +
    +import org.apache.spark.{SparkException, Logging, SparkEnv}
    --- End diff --
    
    nit: alphabetize


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11281598
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala ---
    @@ -124,84 +143,109 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
           receivingThread
     
           // Call user-defined onStart()
    +      logInfo("Calling onStart")
           onStart()
    +
    +      // Wait until interrupt is called on this thread
    +      while(true) Thread.sleep(100000)
         } catch {
           case ie: InterruptedException =>
    -        logInfo("Receiving thread interrupted")
    -        //println("Receiving thread interrupted")
    +        logInfo("Receiving thread has been interrupted, receiver "  + streamId + " stopped")
           case e: Exception =>
    -        stopOnError(e)
    +        logError("Error receiving data in receiver " + streamId, e)
    +        exceptions += e
    +    }
    +
    +    // Call user-defined onStop()
    +    logInfo("Calling onStop")
    +    try {
    +      onStop()
    +    } catch {
    +      case  e: Exception =>
    +        logError("Error stopping receiver " + streamId, e)
    +        exceptions += e
    +    }
    +
    +    val message = if (exceptions.isEmpty) {
    +      null
    +    } else if (exceptions.size == 1) {
    +      val e = exceptions.head
    +      "Exception in receiver " + streamId + ": " + e.getMessage + "\n" + e.getStackTraceString
    +    } else {
    +      "Multiple exceptions in receiver " + streamId + "(" + exceptions.size + "):\n"
    +        exceptions.zipWithIndex.map {
    +          case (e, i) => "Exception " + i + ": " + e.getMessage + "\n" + e.getStackTraceString
    +        }.mkString("\n")
         }
    +    logInfo("Deregistering receiver " + streamId)
    +    val future = trackerActor.ask(DeregisterReceiver(streamId, message))(askTimeout)
    +    Await.result(future, askTimeout)
    +    logInfo("Deregistered receiver " + streamId)
    +    env.actorSystem.stop(actor)
    +    logInfo("Stopped receiver " + streamId)
       }
     
       /**
    -   * Stops the receiver. First it interrupts the main receiving thread,
    +   * Stop the receiver. First it interrupts the main receiving thread,
        * that is, the thread that called receiver.start(). Then it calls the user-defined
    --- End diff --
    
    Why do you think so?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11227633
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -430,12 +448,27 @@ class StreamingContext private[streaming] (
       /**
        * Stop the execution of the streams.
        * @param stopSparkContext Stop the associated SparkContext or not
    +   * @param stopGracefully Stop gracefully by waiting for the processing of all
    +   *                       received data to be completed
        */
    -  def stop(stopSparkContext: Boolean = true) = synchronized {
    -    scheduler.stop()
    +  def stop(
    +      stopSparkContext: Boolean = true,
    +      stopGracefully: Boolean = false
    +    ): Unit = synchronized {
    +    // Silently warn if context is stopped twice, or context is stopped before starting
    --- End diff --
    
    What does "silently" mean here? Doesn't it print a warning? Or did you mean "warn but don't fail".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11371044
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala ---
    @@ -215,4 +271,29 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
       }
     }
     
    -class TestException(msg: String) extends Exception(msg)
    \ No newline at end of file
    +class TestException(msg: String) extends Exception(msg)
    +
    +/** Custom receiver for testing whether all data received by a receiver gets processed or not */
    +class TestReceiver extends NetworkReceiver[Int] {
    +  protected lazy val blockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY)
    +  protected def onStart() {
    +    blockGenerator.start()
    +    logInfo("BlockGenerator started on thread " + receivingThread)
    +    try {
    +      while(true) {
    +        blockGenerator += TestReceiver.counter.getAndIncrement
    +        Thread.sleep(0)
    +      }
    +    } finally {
    +      logInfo("Receiving stopped at count value of " + TestReceiver.counter.get())
    +    }
    +  }
    +
    +  protected def onStop() {
    +    blockGenerator.stop()
    +  }
    +}
    +
    +object TestReceiver {
    +  val counter = new AtomicInteger(1)
    +}
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11283933
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala ---
    @@ -124,84 +143,109 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
           receivingThread
     
           // Call user-defined onStart()
    +      logInfo("Calling onStart")
           onStart()
    +
    +      // Wait until interrupt is called on this thread
    +      while(true) Thread.sleep(100000)
         } catch {
           case ie: InterruptedException =>
    -        logInfo("Receiving thread interrupted")
    -        //println("Receiving thread interrupted")
    +        logInfo("Receiving thread has been interrupted, receiver "  + streamId + " stopped")
           case e: Exception =>
    -        stopOnError(e)
    +        logError("Error receiving data in receiver " + streamId, e)
    +        exceptions += e
    +    }
    +
    +    // Call user-defined onStop()
    +    logInfo("Calling onStop")
    +    try {
    +      onStop()
    +    } catch {
    +      case  e: Exception =>
    +        logError("Error stopping receiver " + streamId, e)
    +        exceptions += e
    +    }
    +
    +    val message = if (exceptions.isEmpty) {
    +      null
    +    } else if (exceptions.size == 1) {
    +      val e = exceptions.head
    +      "Exception in receiver " + streamId + ": " + e.getMessage + "\n" + e.getStackTraceString
    +    } else {
    +      "Multiple exceptions in receiver " + streamId + "(" + exceptions.size + "):\n"
    +        exceptions.zipWithIndex.map {
    +          case (e, i) => "Exception " + i + ": " + e.getMessage + "\n" + e.getStackTraceString
    +        }.mkString("\n")
         }
    +    logInfo("Deregistering receiver " + streamId)
    +    val future = trackerActor.ask(DeregisterReceiver(streamId, message))(askTimeout)
    +    Await.result(future, askTimeout)
    +    logInfo("Deregistered receiver " + streamId)
    +    env.actorSystem.stop(actor)
    +    logInfo("Stopped receiver " + streamId)
       }
     
       /**
    -   * Stops the receiver. First it interrupts the main receiving thread,
    +   * Stop the receiver. First it interrupts the main receiving thread,
        * that is, the thread that called receiver.start(). Then it calls the user-defined
    --- End diff --
    
    Ooops! Sorry! Yeah changed that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-38854674
  
    @pwendell


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-39799433
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11229360
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala ---
    @@ -17,44 +17,80 @@
     
     package org.apache.spark.streaming.util
     
    +import org.apache.spark.Logging
    +
     private[streaming]
    -class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) {
    +class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)
    +  extends Logging {
       
    -  private val thread = new Thread("RecurringTimer") {
    +  private val thread = new Thread("RecurringTimer - " + name) {
    +    setDaemon(true)
         override def run() { loop }    
       }
    -  
    -  private var nextTime = 0L
     
    +  private var prevTime = -1L
    +  private var nextTime = -1L
    +  private var stopped = false
    +
    +  /**
    +   * Get the earliest time when this timer can be started. The time must be a
    +   * multiple of this timer's period and more than current time.
    +   */
       def getStartTime(): Long = {
         (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period
       }
     
    +  /**
    +   * Get the earliest time when this timer can be restarted, based on the earlier start time.
    --- End diff --
    
    I found this really confusing to follow. Could this say "based on when it started" rather than "based on the earlier start time"?
    
    Also, can you give an example in the doc here. In particular, when is this different than the value returned by `getStartTime`? I'm a bit confused on that because it seems like this would always return the same result (but I'm sure there is a difference).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-39801451
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13865/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-39781388
  
     Build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11367954
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala ---
    @@ -215,4 +271,29 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
       }
     }
     
    -class TestException(msg: String) extends Exception(msg)
    \ No newline at end of file
    +class TestException(msg: String) extends Exception(msg)
    +
    +/** Custom receiver for testing whether all data received by a receiver gets processed or not */
    +class TestReceiver extends NetworkReceiver[Int] {
    +  protected lazy val blockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY)
    +  protected def onStart() {
    +    blockGenerator.start()
    +    logInfo("BlockGenerator started on thread " + receivingThread)
    +    try {
    +      while(true) {
    +        blockGenerator += TestReceiver.counter.getAndIncrement
    +        Thread.sleep(0)
    +      }
    +    } finally {
    +      logInfo("Receiving stopped at count value of " + TestReceiver.counter.get())
    +    }
    +  }
    +
    +  protected def onStop() {
    +    blockGenerator.stop()
    +  }
    +}
    +
    +object TestReceiver {
    +  val counter = new AtomicInteger(1)
    +}
    --- End diff --
    
    nit: need new line here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-39541536
  
    Build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-39534715
  
    Hey @pwendell I did a pass as well. Some of the network receiver related changes I have punted for now as those have actually changed with #300 and its best to analyze them in context of those changes. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-39534643
  
     Build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-39781404
  
    Build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11281522
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala ---
    @@ -112,7 +131,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
       def getLocationPreference() : Option[String] = None
     
       /**
    -   * Starts the receiver. First is accesses all the lazy members to
    +   * Start the receiver. First is accesses all the lazy members to
    --- End diff --
    
    I am not entirely sure. Lets address this in the other PR #300 where this is refactored any way. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11282380
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala ---
    @@ -124,84 +143,109 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
           receivingThread
     
           // Call user-defined onStart()
    +      logInfo("Calling onStart")
           onStart()
    +
    +      // Wait until interrupt is called on this thread
    +      while(true) Thread.sleep(100000)
         } catch {
           case ie: InterruptedException =>
    -        logInfo("Receiving thread interrupted")
    -        //println("Receiving thread interrupted")
    +        logInfo("Receiving thread has been interrupted, receiver "  + streamId + " stopped")
           case e: Exception =>
    -        stopOnError(e)
    +        logError("Error receiving data in receiver " + streamId, e)
    +        exceptions += e
    +    }
    +
    +    // Call user-defined onStop()
    +    logInfo("Calling onStop")
    +    try {
    +      onStop()
    +    } catch {
    +      case  e: Exception =>
    +        logError("Error stopping receiver " + streamId, e)
    +        exceptions += e
    +    }
    +
    +    val message = if (exceptions.isEmpty) {
    +      null
    +    } else if (exceptions.size == 1) {
    +      val e = exceptions.head
    +      "Exception in receiver " + streamId + ": " + e.getMessage + "\n" + e.getStackTraceString
    +    } else {
    +      "Multiple exceptions in receiver " + streamId + "(" + exceptions.size + "):\n"
    +        exceptions.zipWithIndex.map {
    +          case (e, i) => "Exception " + i + ": " + e.getMessage + "\n" + e.getStackTraceString
    +        }.mkString("\n")
         }
    +    logInfo("Deregistering receiver " + streamId)
    +    val future = trackerActor.ask(DeregisterReceiver(streamId, message))(askTimeout)
    +    Await.result(future, askTimeout)
    +    logInfo("Deregistered receiver " + streamId)
    +    env.actorSystem.stop(actor)
    +    logInfo("Stopped receiver " + streamId)
       }
     
       /**
    -   * Stops the receiver. First it interrupts the main receiving thread,
    +   * Stop the receiver. First it interrupts the main receiving thread,
        * that is, the thread that called receiver.start(). Then it calls the user-defined
        * onStop() method to stop other threads and/or do cleanup.
        */
       def stop() {
    +    // Stop receiving by interrupting the receiving thread
         receivingThread.interrupt()
    --- End diff --
    
    Summarizing our offline conversation, we will document the semantics of onStart not blocking, and convert sleep to condition variable. But I think it best to address in # PR 300, rather than duplicating the changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-38765200
  
    Build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11228838
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala ---
    @@ -124,84 +143,109 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
           receivingThread
     
           // Call user-defined onStart()
    +      logInfo("Calling onStart")
           onStart()
    +
    +      // Wait until interrupt is called on this thread
    +      while(true) Thread.sleep(100000)
         } catch {
           case ie: InterruptedException =>
    -        logInfo("Receiving thread interrupted")
    -        //println("Receiving thread interrupted")
    +        logInfo("Receiving thread has been interrupted, receiver "  + streamId + " stopped")
           case e: Exception =>
    -        stopOnError(e)
    +        logError("Error receiving data in receiver " + streamId, e)
    +        exceptions += e
    +    }
    +
    +    // Call user-defined onStop()
    +    logInfo("Calling onStop")
    +    try {
    +      onStop()
    +    } catch {
    +      case  e: Exception =>
    +        logError("Error stopping receiver " + streamId, e)
    +        exceptions += e
    +    }
    +
    +    val message = if (exceptions.isEmpty) {
    +      null
    +    } else if (exceptions.size == 1) {
    +      val e = exceptions.head
    +      "Exception in receiver " + streamId + ": " + e.getMessage + "\n" + e.getStackTraceString
    +    } else {
    +      "Multiple exceptions in receiver " + streamId + "(" + exceptions.size + "):\n"
    +        exceptions.zipWithIndex.map {
    +          case (e, i) => "Exception " + i + ": " + e.getMessage + "\n" + e.getStackTraceString
    +        }.mkString("\n")
         }
    +    logInfo("Deregistering receiver " + streamId)
    +    val future = trackerActor.ask(DeregisterReceiver(streamId, message))(askTimeout)
    +    Await.result(future, askTimeout)
    +    logInfo("Deregistered receiver " + streamId)
    +    env.actorSystem.stop(actor)
    +    logInfo("Stopped receiver " + streamId)
       }
     
       /**
    -   * Stops the receiver. First it interrupts the main receiving thread,
    +   * Stop the receiver. First it interrupts the main receiving thread,
        * that is, the thread that called receiver.start(). Then it calls the user-defined
        * onStop() method to stop other threads and/or do cleanup.
        */
       def stop() {
    +    // Stop receiving by interrupting the receiving thread
         receivingThread.interrupt()
    -    onStop()
    -    //TODO: terminate the actor
    +    logInfo("Interrupted receiving thread " + receivingThread + " for stopping")
       }
     
       /**
    -   * Stops the receiver and reports exception to the tracker.
    +   * Stop the receiver and reports exception to the tracker.
        * This should be called whenever an exception is to be handled on any thread
        * of the receiver.
        */
       protected def stopOnError(e: Exception) {
         logError("Error receiving data", e)
    +    exceptions += e
         stop()
    -    actor ! ReportError(e.toString)
       }
     
    -
       /**
    -   * Pushes a block (as an ArrayBuffer filled with data) into the block manager.
    +   * Push a block (as an ArrayBuffer filled with data) into the block manager.
        */
       def pushBlock(blockId: BlockId, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) {
    +    logInfo("Block " + blockId + " has last element as " + arrayBuffer.last)
         env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level)
    -    actor ! ReportBlock(blockId, metadata)
    +    trackerActor ! AddBlocks(streamId, Array(blockId), metadata)
    +    logInfo("Pushed block " + blockId)
    --- End diff --
    
    This also seems pretty chatty... do we want to log at `INFO` level at block granularity?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11367369
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala ---
    @@ -112,7 +131,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
       def getLocationPreference() : Option[String] = None
     
       /**
    -   * Starts the receiver. First is accesses all the lazy members to
    +   * Start the receiver. First is accesses all the lazy members to
    --- End diff --
    
    First *it* accesses


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-39802028
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13866/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11367644
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala ---
    @@ -77,30 +82,79 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
         }
       }
     
    -  /** Stop generation of jobs */
    -  def stop() = synchronized {
    -    if (eventActor != null) {
    -      timer.stop()
    -      ssc.env.actorSystem.stop(eventActor)
    -      if (checkpointWriter != null) checkpointWriter.stop()
    -      ssc.graph.stop()
    -      logInfo("JobGenerator stopped")
    +  /**
    +   * Stop generation of jobs. processReceivedData = true makes this wait until jobs
    +   * of current ongoing time interval has been generated, processed and corresponding
    +   * checkpoints written.
    +   */
    +  def stop(processReceivedData: Boolean): Unit = synchronized {
    +    if (eventActor == null) return // generator has already been stopped
    +
    +    if (processReceivedData) {
    +      logInfo("Stopping JobGenerator gracefully")
    +      val timeWhenStopStarted = System.currentTimeMillis()
    +      val stopTimeout = 10 * ssc.graph.batchDuration.milliseconds
    +      val pollTime = 100
    +
    +      // To prevent graceful stop to get stuck permanently
    +      def hasTimedOut = {
    +        val timedOut = System.currentTimeMillis() - timeWhenStopStarted > stopTimeout
    +        if (timedOut) logWarning("Timed out while stopping the job generator")
    +        timedOut
    +      }
    +
    +      // Wait until all the received blocks in the network input tracker has
    +      // been consumed by network input DStreams, and jobs have been generated with them
    +      logInfo("Waiting for all received blocks to be consumed for job generation")
    +      while(!hasTimedOut && jobScheduler.networkInputTracker.hasMoreReceivedBlockIds) {
    +        Thread.sleep(pollTime)
    +      }
    +      logInfo("Waited for all received blocsk to be consumed for job generation")
    --- End diff --
    
    blocsk


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11235032
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala ---
    @@ -77,30 +82,79 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
         }
       }
     
    -  /** Stop generation of jobs */
    -  def stop() = synchronized {
    -    if (eventActor != null) {
    +  /**
    +   * Stop generation of jobs. processAllReceivedData = true makes this wait until jobs
    +   * of current ongoing time interval has been generated, processed and corresponding
    +   * checkpoints written.
    +   */
    +  def stop(processAllReceivedData: Boolean): Unit = synchronized {
    --- End diff --
    
    I think this could be shortened to `processReceivedData` without loss of precision.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11358984
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala ---
    @@ -133,18 +147,61 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
         ssc.start()
         ssc.stop()
         ssc.stop()
    -    ssc = null
       }
     
    +  test("stop before start and start after stop") {
    +    ssc = new StreamingContext(master, appName, batchDuration)
    +    addInputStream(ssc).register
    +    ssc.stop()  // stop before start should not throw exception
    +    ssc.start()
    +    ssc.stop()
    +    intercept[SparkException] {
    +      ssc.start() // start after stop should throw exception
    +    }
    +  }
    +
    +
       test("stop only streaming context") {
         ssc = new StreamingContext(master, appName, batchDuration)
         sc = ssc.sparkContext
         addInputStream(ssc).register
         ssc.start()
         ssc.stop(false)
    -    ssc = null
         assert(sc.makeRDD(1 to 100).collect().size === 100)
         ssc = new StreamingContext(sc, batchDuration)
    +    addInputStream(ssc).register
    +    ssc.start()
    +    ssc.stop()
    +  }
    +
    +  test("stop gracefully") {
    --- End diff --
    
    doing it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/247


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11228531
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala ---
    @@ -124,84 +143,109 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
           receivingThread
     
           // Call user-defined onStart()
    +      logInfo("Calling onStart")
           onStart()
    +
    +      // Wait until interrupt is called on this thread
    +      while(true) Thread.sleep(100000)
    --- End diff --
    
    I'm a little confused here about whether the receiver `onStart` is supposed to block forever or if it's supposed to exit int he normal case. It would be good to add documentation to make it clear whether this represents an error case or a normal case.
    
    Also, instead of busy waiting why not sleep on a condition varaible here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-38765648
  
    One or more automated tests failed
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13491/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-39787097
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13854/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11281342
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -430,12 +448,27 @@ class StreamingContext private[streaming] (
       /**
        * Stop the execution of the streams.
        * @param stopSparkContext Stop the associated SparkContext or not
    +   * @param stopGracefully Stop gracefully by waiting for the processing of all
    +   *                       received data to be completed
        */
    -  def stop(stopSparkContext: Boolean = true) = synchronized {
    -    scheduler.stop()
    +  def stop(
    --- End diff --
    
    I did implement another function. Lets not break stuff unless we absolutely have to.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-38765647
  
    Build finished.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11228760
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala ---
    @@ -124,84 +143,109 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
           receivingThread
     
           // Call user-defined onStart()
    +      logInfo("Calling onStart")
           onStart()
    +
    +      // Wait until interrupt is called on this thread
    +      while(true) Thread.sleep(100000)
         } catch {
           case ie: InterruptedException =>
    -        logInfo("Receiving thread interrupted")
    -        //println("Receiving thread interrupted")
    +        logInfo("Receiving thread has been interrupted, receiver "  + streamId + " stopped")
           case e: Exception =>
    -        stopOnError(e)
    +        logError("Error receiving data in receiver " + streamId, e)
    +        exceptions += e
    +    }
    +
    +    // Call user-defined onStop()
    +    logInfo("Calling onStop")
    +    try {
    +      onStop()
    +    } catch {
    +      case  e: Exception =>
    +        logError("Error stopping receiver " + streamId, e)
    +        exceptions += e
    +    }
    +
    +    val message = if (exceptions.isEmpty) {
    +      null
    +    } else if (exceptions.size == 1) {
    +      val e = exceptions.head
    +      "Exception in receiver " + streamId + ": " + e.getMessage + "\n" + e.getStackTraceString
    +    } else {
    +      "Multiple exceptions in receiver " + streamId + "(" + exceptions.size + "):\n"
    +        exceptions.zipWithIndex.map {
    +          case (e, i) => "Exception " + i + ": " + e.getMessage + "\n" + e.getStackTraceString
    +        }.mkString("\n")
         }
    +    logInfo("Deregistering receiver " + streamId)
    +    val future = trackerActor.ask(DeregisterReceiver(streamId, message))(askTimeout)
    +    Await.result(future, askTimeout)
    +    logInfo("Deregistered receiver " + streamId)
    +    env.actorSystem.stop(actor)
    +    logInfo("Stopped receiver " + streamId)
       }
     
       /**
    -   * Stops the receiver. First it interrupts the main receiving thread,
    +   * Stop the receiver. First it interrupts the main receiving thread,
        * that is, the thread that called receiver.start(). Then it calls the user-defined
        * onStop() method to stop other threads and/or do cleanup.
        */
       def stop() {
    +    // Stop receiving by interrupting the receiving thread
         receivingThread.interrupt()
    -    onStop()
    -    //TODO: terminate the actor
    +    logInfo("Interrupted receiving thread " + receivingThread + " for stopping")
       }
     
       /**
    -   * Stops the receiver and reports exception to the tracker.
    +   * Stop the receiver and reports exception to the tracker.
        * This should be called whenever an exception is to be handled on any thread
        * of the receiver.
        */
       protected def stopOnError(e: Exception) {
         logError("Error receiving data", e)
    +    exceptions += e
         stop()
    -    actor ! ReportError(e.toString)
       }
     
    -
       /**
    -   * Pushes a block (as an ArrayBuffer filled with data) into the block manager.
    +   * Push a block (as an ArrayBuffer filled with data) into the block manager.
        */
       def pushBlock(blockId: BlockId, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) {
    +    logInfo("Block " + blockId + " has last element as " + arrayBuffer.last)
    --- End diff --
    
    Do we want to log this every time? It seems really chatty. Was this a debug statement?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11284020
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala ---
    @@ -17,44 +17,80 @@
     
     package org.apache.spark.streaming.util
     
    +import org.apache.spark.Logging
    +
     private[streaming]
    -class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) {
    +class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)
    +  extends Logging {
       
    -  private val thread = new Thread("RecurringTimer") {
    +  private val thread = new Thread("RecurringTimer - " + name) {
    +    setDaemon(true)
         override def run() { loop }    
       }
    -  
    -  private var nextTime = 0L
     
    +  private var prevTime = -1L
    +  private var nextTime = -1L
    +  private var stopped = false
    +
    +  /**
    +   * Get the earliest time when this timer can be started. The time must be a
    +   * multiple of this timer's period and more than current time.
    +   */
       def getStartTime(): Long = {
         (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period
       }
     
    +  /**
    +   * Get the earliest time when this timer can be restarted, based on the earlier start time.
    +   * The time must be a multiple of this timer's period and more than current time.
    +   */
       def getRestartTime(originalStartTime: Long): Long = {
         val gap = clock.currentTime - originalStartTime
         (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime
       }
     
    -  def start(startTime: Long): Long = {
    +  /**
    +   * Start at the given start time.
    +   */
    +  def start(startTime: Long): Long = synchronized {
         nextTime = startTime
         thread.start()
    +    logInfo("Started timer for " + name + " at time " + nextTime)
         nextTime
       }
     
    +  /**
    +   * Start at the earliest time it can start based on the period.
    +   */
       def start(): Long = {
         start(getStartTime())
       }
     
    -  def stop() {
    -    thread.interrupt() 
    +  /**
    +   * Stop the timer. stopAfterNextCallback = true make it wait for next callback to be completed.
    +   * Returns the last time when it had called back.
    +   */
    +  def stop(stopAfterNextCallback: Boolean = false): Long = synchronized {
    --- End diff --
    
    I made it interruptTimer to make it even more generic. And made it a non-default param.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11227829
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala ---
    @@ -124,84 +143,109 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
           receivingThread
     
           // Call user-defined onStart()
    +      logInfo("Calling onStart")
    --- End diff --
    
    If this is user facing, would it make sense to have the message not depend on the function name. I.e. "Starting the receiver" or something?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-39801450
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-39545196
  
    Build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-39534742
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13754/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-38764889
  
    Add a lifecycle interface is a good idea? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11367750
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala ---
    @@ -50,36 +50,54 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
       private var eventActor: ActorRef = null
     
     
    -  def start() = synchronized {
    -    if (eventActor != null) {
    -      throw new SparkException("JobScheduler already started")
    -    }
    +  def start(): Unit = synchronized {
    +    if (eventActor != null) return // scheduler has already been started
     
    +    logDebug("Starting JobScheduler")
         eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
           def receive = {
             case event: JobSchedulerEvent => processEvent(event)
           }
         }), "JobScheduler")
    +
         listenerBus.start()
         networkInputTracker = new NetworkInputTracker(ssc)
         networkInputTracker.start()
    -    Thread.sleep(1000)
         jobGenerator.start()
    -    logInfo("JobScheduler started")
    +    logInfo("Started JobScheduler")
       }
     
    -  def stop() = synchronized {
    -    if (eventActor != null) {
    -      jobGenerator.stop()
    -      networkInputTracker.stop()
    -      executor.shutdown()
    -      if (!executor.awaitTermination(2, TimeUnit.SECONDS)) {
    -        executor.shutdownNow()
    -      }
    -      listenerBus.stop()
    -      ssc.env.actorSystem.stop(eventActor)
    -      logInfo("JobScheduler stopped")
    +  def stop(processAllReceivedData: Boolean): Unit = synchronized {
    +    if (eventActor == null) return // scheduler has already been stopped
    +    logDebug("Stopping JobScheduler")
    +
    +    // First, stop receiving
    +    networkInputTracker.stop()
    +
    +    // Second, stop generating jobs. If it has to process all received data,
    +    // then this will wait for all the processing through JobScheduler to be over.
    +    jobGenerator.stop(processAllReceivedData)
    +
    +    // Stop the executor for receiving new jobs
    +    logDebug("Stopping job executor")
    +    jobExecutor.shutdown()
    +
    +    // Wait for the queued jobs to complete if indicated
    +    val terminated = if (processAllReceivedData) {
    +      jobExecutor.awaitTermination(1, TimeUnit.HOURS)  // just a very large period of time
    +    } else {
    +      jobExecutor.awaitTermination(2, TimeUnit.SECONDS)
         }
    +    if (!terminated) {
    +      jobExecutor.shutdownNow()
    +    }
    +    logDebug("Stopped job executor")
    +
    +    // Stop everything else
    +    listenerBus.stop()
    +    ssc.env.actorSystem.stop(eventActor)
    +    eventActor == null
    --- End diff --
    
    did you mean `eventActor = null`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11281176
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -158,6 +158,15 @@ class StreamingContext private[streaming] (
     
       private[streaming] val waiter = new ContextWaiter
     
    +  /** Enumeration to identify current state of the StreamingContext */
    +  private[streaming] object ContextState extends Enumeration {
    --- End diff --
    
    Changed. But keeping the doc in. Extra doc do no harm. ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-39402799
  
    Hey TD - Did a pass. Had some questions and surface level comment.
    
    One thing that came up a few was the use of a busy-wait/Thread.sleep along with interruption. Ideally rather than calling sleep in a loop you would sleep on a condition variable and notify it in the other thread. With the current approach you anyways have to keep a pointer to the thread so it knows what to interrupt. Why not just have a CV?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11282425
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala ---
    @@ -124,84 +143,109 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
           receivingThread
     
           // Call user-defined onStart()
    +      logInfo("Calling onStart")
           onStart()
    +
    +      // Wait until interrupt is called on this thread
    +      while(true) Thread.sleep(100000)
         } catch {
           case ie: InterruptedException =>
    -        logInfo("Receiving thread interrupted")
    -        //println("Receiving thread interrupted")
    +        logInfo("Receiving thread has been interrupted, receiver "  + streamId + " stopped")
           case e: Exception =>
    -        stopOnError(e)
    +        logError("Error receiving data in receiver " + streamId, e)
    +        exceptions += e
    +    }
    +
    +    // Call user-defined onStop()
    +    logInfo("Calling onStop")
    +    try {
    +      onStop()
    +    } catch {
    +      case  e: Exception =>
    +        logError("Error stopping receiver " + streamId, e)
    +        exceptions += e
    +    }
    +
    +    val message = if (exceptions.isEmpty) {
    +      null
    +    } else if (exceptions.size == 1) {
    +      val e = exceptions.head
    +      "Exception in receiver " + streamId + ": " + e.getMessage + "\n" + e.getStackTraceString
    +    } else {
    +      "Multiple exceptions in receiver " + streamId + "(" + exceptions.size + "):\n"
    +        exceptions.zipWithIndex.map {
    +          case (e, i) => "Exception " + i + ": " + e.getMessage + "\n" + e.getStackTraceString
    +        }.mkString("\n")
         }
    +    logInfo("Deregistering receiver " + streamId)
    +    val future = trackerActor.ask(DeregisterReceiver(streamId, message))(askTimeout)
    +    Await.result(future, askTimeout)
    +    logInfo("Deregistered receiver " + streamId)
    +    env.actorSystem.stop(actor)
    +    logInfo("Stopped receiver " + streamId)
       }
     
       /**
    -   * Stops the receiver. First it interrupts the main receiving thread,
    +   * Stop the receiver. First it interrupts the main receiving thread,
        * that is, the thread that called receiver.start(). Then it calls the user-defined
        * onStop() method to stop other threads and/or do cleanup.
        */
       def stop() {
    +    // Stop receiving by interrupting the receiving thread
         receivingThread.interrupt()
    -    onStop()
    -    //TODO: terminate the actor
    +    logInfo("Interrupted receiving thread " + receivingThread + " for stopping")
       }
     
       /**
    -   * Stops the receiver and reports exception to the tracker.
    +   * Stop the receiver and reports exception to the tracker.
        * This should be called whenever an exception is to be handled on any thread
        * of the receiver.
        */
       protected def stopOnError(e: Exception) {
         logError("Error receiving data", e)
    +    exceptions += e
         stop()
    -    actor ! ReportError(e.toString)
       }
     
    -
       /**
    -   * Pushes a block (as an ArrayBuffer filled with data) into the block manager.
    +   * Push a block (as an ArrayBuffer filled with data) into the block manager.
        */
       def pushBlock(blockId: BlockId, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) {
    +    logInfo("Block " + blockId + " has last element as " + arrayBuffer.last)
         env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level)
    -    actor ! ReportBlock(blockId, metadata)
    +    trackerActor ! AddBlocks(streamId, Array(blockId), metadata)
    +    logInfo("Pushed block " + blockId)
    --- End diff --
    
    Converted to debug.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-39534647
  
    Build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-39799994
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11371039
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala ---
    @@ -21,15 +21,17 @@ import org.scalatest.{FunSuite, BeforeAndAfter}
     import org.scalatest.exceptions.TestFailedDueToTimeoutException
     import org.scalatest.concurrent.Timeouts
     import org.scalatest.time.SpanSugar._
    -import org.apache.spark.{SparkException, SparkConf, SparkContext}
    +import org.apache.spark.{Logging, SparkException, SparkConf, SparkContext}
     import org.apache.spark.util.{Utils, MetadataCleaner}
    -import org.apache.spark.streaming.dstream.DStream
    +import org.apache.spark.streaming.dstream.{NetworkReceiver, DStream}
    +import java.util.concurrent.atomic.AtomicInteger
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11281494
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -430,12 +448,27 @@ class StreamingContext private[streaming] (
       /**
        * Stop the execution of the streams.
        * @param stopSparkContext Stop the associated SparkContext or not
    +   * @param stopGracefully Stop gracefully by waiting for the processing of all
    +   *                       received data to be completed
        */
    -  def stop(stopSparkContext: Boolean = true) = synchronized {
    -    scheduler.stop()
    +  def stop(
    +      stopSparkContext: Boolean = true,
    +      stopGracefully: Boolean = false
    +    ): Unit = synchronized {
    +    // Silently warn if context is stopped twice, or context is stopped before starting
    --- End diff --
    
    I removed silently. But on that note, do you think whether the semantics make sense?
    1. Stopping twice --> only warn
    2. Stop before start --> only warn
    3. Starting twice --> fail
    4. Starting after stopping --> fail
    
    ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11371038
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala ---
    @@ -17,20 +17,16 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import org.apache.spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver}
    -import org.apache.spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError}
    -import org.apache.spark.{SparkException, Logging, SparkEnv}
    -import org.apache.spark.SparkContext._
    -
    -import scala.collection.mutable.HashMap
    -import scala.collection.mutable.Queue
    -import scala.concurrent.duration._
    +import scala.collection.mutable.{HashMap, SynchronizedMap, Queue}
     
     import akka.actor._
    -import akka.pattern.ask
    -import akka.dispatch._
    +
    +import org.apache.spark.{SparkException, Logging, SparkEnv}
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11234673
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala ---
    @@ -17,44 +17,80 @@
     
     package org.apache.spark.streaming.util
     
    +import org.apache.spark.Logging
    +
     private[streaming]
    -class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) {
    +class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)
    +  extends Logging {
       
    -  private val thread = new Thread("RecurringTimer") {
    +  private val thread = new Thread("RecurringTimer - " + name) {
    +    setDaemon(true)
         override def run() { loop }    
       }
    -  
    -  private var nextTime = 0L
     
    +  private var prevTime = -1L
    +  private var nextTime = -1L
    +  private var stopped = false
    +
    +  /**
    +   * Get the earliest time when this timer can be started. The time must be a
    +   * multiple of this timer's period and more than current time.
    +   */
       def getStartTime(): Long = {
         (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period
       }
     
    +  /**
    +   * Get the earliest time when this timer can be restarted, based on the earlier start time.
    +   * The time must be a multiple of this timer's period and more than current time.
    +   */
       def getRestartTime(originalStartTime: Long): Long = {
         val gap = clock.currentTime - originalStartTime
         (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime
       }
     
    -  def start(startTime: Long): Long = {
    +  /**
    +   * Start at the given start time.
    +   */
    +  def start(startTime: Long): Long = synchronized {
         nextTime = startTime
         thread.start()
    +    logInfo("Started timer for " + name + " at time " + nextTime)
         nextTime
       }
     
    +  /**
    +   * Start at the earliest time it can start based on the period.
    +   */
       def start(): Long = {
         start(getStartTime())
       }
     
    -  def stop() {
    -    thread.interrupt() 
    +  /**
    +   * Stop the timer. stopAfterNextCallback = true make it wait for next callback to be completed.
    +   * Returns the last time when it had called back.
    +   */
    +  def stop(stopAfterNextCallback: Boolean = false): Long = synchronized {
    --- End diff --
    
    As per our discussion, let's just change it to:
    
    `def stop(interruptCallback: Boolean = true)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11281527
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala ---
    @@ -124,84 +143,109 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
           receivingThread
     
           // Call user-defined onStart()
    +      logInfo("Calling onStart")
    --- End diff --
    
    Will address this in PR #300 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11229496
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala ---
    @@ -17,44 +17,80 @@
     
     package org.apache.spark.streaming.util
     
    +import org.apache.spark.Logging
    +
     private[streaming]
    -class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) {
    +class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)
    +  extends Logging {
       
    -  private val thread = new Thread("RecurringTimer") {
    +  private val thread = new Thread("RecurringTimer - " + name) {
    +    setDaemon(true)
         override def run() { loop }    
       }
    -  
    -  private var nextTime = 0L
     
    +  private var prevTime = -1L
    +  private var nextTime = -1L
    +  private var stopped = false
    +
    +  /**
    +   * Get the earliest time when this timer can be started. The time must be a
    +   * multiple of this timer's period and more than current time.
    +   */
       def getStartTime(): Long = {
         (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period
       }
     
    +  /**
    +   * Get the earliest time when this timer can be restarted, based on the earlier start time.
    +   * The time must be a multiple of this timer's period and more than current time.
    +   */
       def getRestartTime(originalStartTime: Long): Long = {
         val gap = clock.currentTime - originalStartTime
         (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime
       }
     
    -  def start(startTime: Long): Long = {
    +  /**
    +   * Start at the given start time.
    +   */
    +  def start(startTime: Long): Long = synchronized {
         nextTime = startTime
         thread.start()
    +    logInfo("Started timer for " + name + " at time " + nextTime)
         nextTime
       }
     
    +  /**
    +   * Start at the earliest time it can start based on the period.
    +   */
       def start(): Long = {
         start(getStartTime())
       }
     
    -  def stop() {
    -    thread.interrupt() 
    +  /**
    +   * Stop the timer. stopAfterNextCallback = true make it wait for next callback to be completed.
    +   * Returns the last time when it had called back.
    +   */
    +  def stop(stopAfterNextCallback: Boolean = false): Long = synchronized {
    +    if (!stopped) {
    --- End diff --
    
    does `stopped` need to be `volatile` here to work?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-39787095
  
    Build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-39802027
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11229778
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala ---
    @@ -17,44 +17,80 @@
     
     package org.apache.spark.streaming.util
     
    +import org.apache.spark.Logging
    +
     private[streaming]
    -class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) {
    +class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)
    +  extends Logging {
       
    -  private val thread = new Thread("RecurringTimer") {
    +  private val thread = new Thread("RecurringTimer - " + name) {
    +    setDaemon(true)
         override def run() { loop }    
       }
    -  
    -  private var nextTime = 0L
     
    +  private var prevTime = -1L
    +  private var nextTime = -1L
    +  private var stopped = false
    +
    +  /**
    +   * Get the earliest time when this timer can be started. The time must be a
    +   * multiple of this timer's period and more than current time.
    +   */
       def getStartTime(): Long = {
         (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period
       }
     
    +  /**
    +   * Get the earliest time when this timer can be restarted, based on the earlier start time.
    +   * The time must be a multiple of this timer's period and more than current time.
    +   */
       def getRestartTime(originalStartTime: Long): Long = {
         val gap = clock.currentTime - originalStartTime
         (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime
       }
     
    -  def start(startTime: Long): Long = {
    +  /**
    +   * Start at the given start time.
    +   */
    +  def start(startTime: Long): Long = synchronized {
         nextTime = startTime
         thread.start()
    +    logInfo("Started timer for " + name + " at time " + nextTime)
         nextTime
       }
     
    +  /**
    +   * Start at the earliest time it can start based on the period.
    +   */
       def start(): Long = {
         start(getStartTime())
       }
     
    -  def stop() {
    -    thread.interrupt() 
    +  /**
    +   * Stop the timer. stopAfterNextCallback = true make it wait for next callback to be completed.
    +   * Returns the last time when it had called back.
    +   */
    +  def stop(stopAfterNextCallback: Boolean = false): Long = synchronized {
    --- End diff --
    
    I'm a bit unsure of the semantics of `stopAfterNextCallback`. What if, for instance, the other thread is in the middle of executing a callback when `stop(true)` is called here. Then won't it just finish the _currently executing_ callback?
    
    It seems like the semantics are something like:
    
    (a) If there is a callback currently being executed, it will be allowed to finish, and then the timer will stop.
    (b) If we are currently waiting in-between callbacks, then we will fully execute a single additional callback and then the timer will stop.
    
    Am I understanding that correctly? Those seem pretty convoluted to me. What about just adding checking `stopped` again after you wake up from sleeping. Then you could have the flag just provide (a) but not (b).
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11227952
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala ---
    @@ -112,7 +131,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
       def getLocationPreference() : Option[String] = None
     
       /**
    -   * Starts the receiver. First is accesses all the lazy members to
    +   * Start the receiver. First is accesses all the lazy members to
    --- End diff --
    
    should the below method `start()` be private?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11281533
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala ---
    @@ -124,84 +143,109 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
           receivingThread
     
           // Call user-defined onStart()
    +      logInfo("Calling onStart")
           onStart()
    +
    +      // Wait until interrupt is called on this thread
    +      while(true) Thread.sleep(100000)
    --- End diff --
    
    Will address this in PR #300 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11282456
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala ---
    @@ -77,30 +82,79 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
         }
       }
     
    -  /** Stop generation of jobs */
    -  def stop() = synchronized {
    -    if (eventActor != null) {
    +  /**
    +   * Stop generation of jobs. processAllReceivedData = true makes this wait until jobs
    +   * of current ongoing time interval has been generated, processed and corresponding
    +   * checkpoints written.
    +   */
    +  def stop(processAllReceivedData: Boolean): Unit = synchronized {
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11371037
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala ---
    @@ -50,36 +50,54 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
       private var eventActor: ActorRef = null
     
     
    -  def start() = synchronized {
    -    if (eventActor != null) {
    -      throw new SparkException("JobScheduler already started")
    -    }
    +  def start(): Unit = synchronized {
    +    if (eventActor != null) return // scheduler has already been started
     
    +    logDebug("Starting JobScheduler")
         eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
           def receive = {
             case event: JobSchedulerEvent => processEvent(event)
           }
         }), "JobScheduler")
    +
         listenerBus.start()
         networkInputTracker = new NetworkInputTracker(ssc)
         networkInputTracker.start()
    -    Thread.sleep(1000)
         jobGenerator.start()
    -    logInfo("JobScheduler started")
    +    logInfo("Started JobScheduler")
       }
     
    -  def stop() = synchronized {
    -    if (eventActor != null) {
    -      jobGenerator.stop()
    -      networkInputTracker.stop()
    -      executor.shutdown()
    -      if (!executor.awaitTermination(2, TimeUnit.SECONDS)) {
    -        executor.shutdownNow()
    -      }
    -      listenerBus.stop()
    -      ssc.env.actorSystem.stop(eventActor)
    -      logInfo("JobScheduler stopped")
    +  def stop(processAllReceivedData: Boolean): Unit = synchronized {
    +    if (eventActor == null) return // scheduler has already been stopped
    +    logDebug("Stopping JobScheduler")
    +
    +    // First, stop receiving
    +    networkInputTracker.stop()
    +
    +    // Second, stop generating jobs. If it has to process all received data,
    +    // then this will wait for all the processing through JobScheduler to be over.
    +    jobGenerator.stop(processAllReceivedData)
    +
    +    // Stop the executor for receiving new jobs
    +    logDebug("Stopping job executor")
    +    jobExecutor.shutdown()
    +
    +    // Wait for the queued jobs to complete if indicated
    +    val terminated = if (processAllReceivedData) {
    +      jobExecutor.awaitTermination(1, TimeUnit.HOURS)  // just a very large period of time
    +    } else {
    +      jobExecutor.awaitTermination(2, TimeUnit.SECONDS)
         }
    +    if (!terminated) {
    +      jobExecutor.shutdownNow()
    +    }
    +    logDebug("Stopped job executor")
    +
    +    // Stop everything else
    +    listenerBus.stop()
    +    ssc.env.actorSystem.stop(eventActor)
    +    eventActor == null
    --- End diff --
    
    Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11235082
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala ---
    @@ -77,30 +82,79 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
         }
       }
     
    -  /** Stop generation of jobs */
    -  def stop() = synchronized {
    -    if (eventActor != null) {
    +  /**
    +   * Stop generation of jobs. processAllReceivedData = true makes this wait until jobs
    +   * of current ongoing time interval has been generated, processed and corresponding
    +   * checkpoints written.
    +   */
    +  def stop(processAllReceivedData: Boolean): Unit = synchronized {
    +    if (eventActor == null) return // generator has already been stopped
    +
    +    if (processAllReceivedData) {
    +      logInfo("Stopping JobGenerator gracefully")
    +      val timeWhenStopStarted = System.currentTimeMillis()
    +      val stopTimeout = 10 * ssc.graph.batchDuration.milliseconds
    +      val pollTime = 100
    +
    +      // To prevent graceful stop to get stuck permanently
    +      def hasTimedOut = {
    +        val timedOut = System.currentTimeMillis() - timeWhenStopStarted > stopTimeout
    +        if (timedOut) logWarning("Timed out while stopping the job generator")
    +        timedOut
    +      }
    +
    +      // Wait until all the received blocks in the network input tracker has
    +      // been consumed by network input DStreams, and jobs have been generated with them
    +      logInfo("Waiting for all received blocks to be consumed for job generation")
    +      while(!hasTimedOut && jobScheduler.networkInputTracker.hasMoreReceivedBlockIds) {
    +        Thread.sleep(pollTime)
    +      }
    +      logInfo("Waited for all received blocsk to be consumed for job generation")
    +
    +      // Stop generating jobs
    +      val stopTime = timer.stop(stopAfterNextCallback = true)
    +      graph.stop()
    +      logInfo("Stopped generation timer")
    +
    +      // Wait for the jobs to complete and checkpoints to be written
    +      def hasAllBatchesBeenFullyProcessed = {
    --- End diff --
    
    I think this should be `haveAllBatchesBeenFullyProcessed` or maybe `allBatchesProcessed` for conciseness.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11234745
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala ---
    @@ -124,84 +143,109 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
           receivingThread
     
           // Call user-defined onStart()
    +      logInfo("Calling onStart")
           onStart()
    +
    +      // Wait until interrupt is called on this thread
    +      while(true) Thread.sleep(100000)
         } catch {
           case ie: InterruptedException =>
    -        logInfo("Receiving thread interrupted")
    -        //println("Receiving thread interrupted")
    +        logInfo("Receiving thread has been interrupted, receiver "  + streamId + " stopped")
           case e: Exception =>
    -        stopOnError(e)
    +        logError("Error receiving data in receiver " + streamId, e)
    +        exceptions += e
    +    }
    +
    +    // Call user-defined onStop()
    +    logInfo("Calling onStop")
    +    try {
    +      onStop()
    +    } catch {
    +      case  e: Exception =>
    +        logError("Error stopping receiver " + streamId, e)
    +        exceptions += e
    +    }
    +
    +    val message = if (exceptions.isEmpty) {
    +      null
    +    } else if (exceptions.size == 1) {
    +      val e = exceptions.head
    +      "Exception in receiver " + streamId + ": " + e.getMessage + "\n" + e.getStackTraceString
    +    } else {
    +      "Multiple exceptions in receiver " + streamId + "(" + exceptions.size + "):\n"
    +        exceptions.zipWithIndex.map {
    +          case (e, i) => "Exception " + i + ": " + e.getMessage + "\n" + e.getStackTraceString
    +        }.mkString("\n")
         }
    +    logInfo("Deregistering receiver " + streamId)
    +    val future = trackerActor.ask(DeregisterReceiver(streamId, message))(askTimeout)
    +    Await.result(future, askTimeout)
    +    logInfo("Deregistered receiver " + streamId)
    +    env.actorSystem.stop(actor)
    +    logInfo("Stopped receiver " + streamId)
       }
     
       /**
    -   * Stops the receiver. First it interrupts the main receiving thread,
    +   * Stop the receiver. First it interrupts the main receiving thread,
        * that is, the thread that called receiver.start(). Then it calls the user-defined
    --- End diff --
    
    This javadoc is outdated now, isn't it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-39799987
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11234823
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala ---
    @@ -124,84 +143,109 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
           receivingThread
     
           // Call user-defined onStart()
    +      logInfo("Calling onStart")
           onStart()
    +
    +      // Wait until interrupt is called on this thread
    +      while(true) Thread.sleep(100000)
         } catch {
           case ie: InterruptedException =>
    -        logInfo("Receiving thread interrupted")
    -        //println("Receiving thread interrupted")
    +        logInfo("Receiving thread has been interrupted, receiver "  + streamId + " stopped")
           case e: Exception =>
    -        stopOnError(e)
    +        logError("Error receiving data in receiver " + streamId, e)
    +        exceptions += e
    +    }
    +
    +    // Call user-defined onStop()
    +    logInfo("Calling onStop")
    +    try {
    +      onStop()
    +    } catch {
    +      case  e: Exception =>
    +        logError("Error stopping receiver " + streamId, e)
    +        exceptions += e
    +    }
    +
    +    val message = if (exceptions.isEmpty) {
    +      null
    +    } else if (exceptions.size == 1) {
    +      val e = exceptions.head
    +      "Exception in receiver " + streamId + ": " + e.getMessage + "\n" + e.getStackTraceString
    +    } else {
    +      "Multiple exceptions in receiver " + streamId + "(" + exceptions.size + "):\n"
    +        exceptions.zipWithIndex.map {
    +          case (e, i) => "Exception " + i + ": " + e.getMessage + "\n" + e.getStackTraceString
    +        }.mkString("\n")
         }
    +    logInfo("Deregistering receiver " + streamId)
    +    val future = trackerActor.ask(DeregisterReceiver(streamId, message))(askTimeout)
    +    Await.result(future, askTimeout)
    +    logInfo("Deregistered receiver " + streamId)
    +    env.actorSystem.stop(actor)
    +    logInfo("Stopped receiver " + streamId)
       }
     
       /**
    -   * Stops the receiver. First it interrupts the main receiving thread,
    +   * Stop the receiver. First it interrupts the main receiving thread,
        * that is, the thread that called receiver.start(). Then it calls the user-defined
        * onStop() method to stop other threads and/or do cleanup.
        */
       def stop() {
    +    // Stop receiving by interrupting the receiving thread
         receivingThread.interrupt()
    --- End diff --
    
    This is a bit confusing - is the receivingThread assumed to be running user code or is the assumption that it's sleeping in the `Thread.sleep(` line? I think it would simplify things a lot if you could assume that `onStart` doesn't block. 
    
    Also, could you have a condition variable here instead of using sleep/interruption? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-39401108
  
    @witgo I completely agree that a better lifecycle interface is necessary. That has been addressed by a different PR #300. This PR only adds the changes necessary to the other components of the system (NetworkInputTracker, JobScheduler, JobGenerator, etc.) to allow graceful shutdown.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11319581
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala ---
    @@ -133,18 +147,61 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
         ssc.start()
         ssc.stop()
         ssc.stop()
    -    ssc = null
       }
     
    +  test("stop before start and start after stop") {
    +    ssc = new StreamingContext(master, appName, batchDuration)
    +    addInputStream(ssc).register
    +    ssc.stop()  // stop before start should not throw exception
    +    ssc.start()
    +    ssc.stop()
    +    intercept[SparkException] {
    +      ssc.start() // start after stop should throw exception
    +    }
    +  }
    +
    +
       test("stop only streaming context") {
         ssc = new StreamingContext(master, appName, batchDuration)
         sc = ssc.sparkContext
         addInputStream(ssc).register
         ssc.start()
         ssc.stop(false)
    -    ssc = null
         assert(sc.makeRDD(1 to 100).collect().size === 100)
         ssc = new StreamingContext(sc, batchDuration)
    +    addInputStream(ssc).register
    +    ssc.start()
    +    ssc.stop()
    +  }
    +
    +  test("stop gracefully") {
    --- End diff --
    
    This test takes 12 seconds which is pretty long. Is there any way we can make this shorter?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-39817343
  
    Looks good TD, merging this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11282497
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala ---
    @@ -77,30 +82,79 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
         }
       }
     
    -  /** Stop generation of jobs */
    -  def stop() = synchronized {
    -    if (eventActor != null) {
    +  /**
    +   * Stop generation of jobs. processAllReceivedData = true makes this wait until jobs
    +   * of current ongoing time interval has been generated, processed and corresponding
    +   * checkpoints written.
    +   */
    +  def stop(processAllReceivedData: Boolean): Unit = synchronized {
    +    if (eventActor == null) return // generator has already been stopped
    +
    +    if (processAllReceivedData) {
    +      logInfo("Stopping JobGenerator gracefully")
    +      val timeWhenStopStarted = System.currentTimeMillis()
    +      val stopTimeout = 10 * ssc.graph.batchDuration.milliseconds
    +      val pollTime = 100
    +
    +      // To prevent graceful stop to get stuck permanently
    +      def hasTimedOut = {
    +        val timedOut = System.currentTimeMillis() - timeWhenStopStarted > stopTimeout
    +        if (timedOut) logWarning("Timed out while stopping the job generator")
    +        timedOut
    +      }
    +
    +      // Wait until all the received blocks in the network input tracker has
    +      // been consumed by network input DStreams, and jobs have been generated with them
    +      logInfo("Waiting for all received blocks to be consumed for job generation")
    +      while(!hasTimedOut && jobScheduler.networkInputTracker.hasMoreReceivedBlockIds) {
    +        Thread.sleep(pollTime)
    +      }
    +      logInfo("Waited for all received blocsk to be consumed for job generation")
    +
    +      // Stop generating jobs
    +      val stopTime = timer.stop(stopAfterNextCallback = true)
    +      graph.stop()
    +      logInfo("Stopped generation timer")
    +
    +      // Wait for the jobs to complete and checkpoints to be written
    +      def hasAllBatchesBeenFullyProcessed = {
    --- End diff --
    
    haveAllBatchesBeenProcessed. Slightly shorter, grammatically better than allBatchesProcessed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11367919
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala ---
    @@ -21,15 +21,17 @@ import org.scalatest.{FunSuite, BeforeAndAfter}
     import org.scalatest.exceptions.TestFailedDueToTimeoutException
     import org.scalatest.concurrent.Timeouts
     import org.scalatest.time.SpanSugar._
    -import org.apache.spark.{SparkException, SparkConf, SparkContext}
    +import org.apache.spark.{Logging, SparkException, SparkConf, SparkContext}
     import org.apache.spark.util.{Utils, MetadataCleaner}
    -import org.apache.spark.streaming.dstream.DStream
    +import org.apache.spark.streaming.dstream.{NetworkReceiver, DStream}
    +import java.util.concurrent.atomic.AtomicInteger
    --- End diff --
    
    nit: move this up


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-39799424
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-38765199
  
     Build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11282396
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala ---
    @@ -124,84 +143,109 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
           receivingThread
     
           // Call user-defined onStart()
    +      logInfo("Calling onStart")
           onStart()
    +
    +      // Wait until interrupt is called on this thread
    +      while(true) Thread.sleep(100000)
         } catch {
           case ie: InterruptedException =>
    -        logInfo("Receiving thread interrupted")
    -        //println("Receiving thread interrupted")
    +        logInfo("Receiving thread has been interrupted, receiver "  + streamId + " stopped")
           case e: Exception =>
    -        stopOnError(e)
    +        logError("Error receiving data in receiver " + streamId, e)
    +        exceptions += e
    +    }
    +
    +    // Call user-defined onStop()
    +    logInfo("Calling onStop")
    +    try {
    +      onStop()
    +    } catch {
    +      case  e: Exception =>
    +        logError("Error stopping receiver " + streamId, e)
    +        exceptions += e
    +    }
    +
    +    val message = if (exceptions.isEmpty) {
    +      null
    +    } else if (exceptions.size == 1) {
    +      val e = exceptions.head
    +      "Exception in receiver " + streamId + ": " + e.getMessage + "\n" + e.getStackTraceString
    +    } else {
    +      "Multiple exceptions in receiver " + streamId + "(" + exceptions.size + "):\n"
    +        exceptions.zipWithIndex.map {
    +          case (e, i) => "Exception " + i + ": " + e.getMessage + "\n" + e.getStackTraceString
    +        }.mkString("\n")
         }
    +    logInfo("Deregistering receiver " + streamId)
    +    val future = trackerActor.ask(DeregisterReceiver(streamId, message))(askTimeout)
    +    Await.result(future, askTimeout)
    +    logInfo("Deregistered receiver " + streamId)
    +    env.actorSystem.stop(actor)
    +    logInfo("Stopped receiver " + streamId)
       }
     
       /**
    -   * Stops the receiver. First it interrupts the main receiving thread,
    +   * Stop the receiver. First it interrupts the main receiving thread,
        * that is, the thread that called receiver.start(). Then it calls the user-defined
    --- End diff --
    
    It says "Then it calls the user-defined onStop method" but in this change you remove that call.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11282410
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala ---
    @@ -124,84 +143,109 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
           receivingThread
     
           // Call user-defined onStart()
    +      logInfo("Calling onStart")
           onStart()
    +
    +      // Wait until interrupt is called on this thread
    +      while(true) Thread.sleep(100000)
         } catch {
           case ie: InterruptedException =>
    -        logInfo("Receiving thread interrupted")
    -        //println("Receiving thread interrupted")
    +        logInfo("Receiving thread has been interrupted, receiver "  + streamId + " stopped")
           case e: Exception =>
    -        stopOnError(e)
    +        logError("Error receiving data in receiver " + streamId, e)
    +        exceptions += e
    +    }
    +
    +    // Call user-defined onStop()
    +    logInfo("Calling onStop")
    +    try {
    +      onStop()
    +    } catch {
    +      case  e: Exception =>
    +        logError("Error stopping receiver " + streamId, e)
    +        exceptions += e
    +    }
    +
    +    val message = if (exceptions.isEmpty) {
    +      null
    +    } else if (exceptions.size == 1) {
    +      val e = exceptions.head
    +      "Exception in receiver " + streamId + ": " + e.getMessage + "\n" + e.getStackTraceString
    +    } else {
    +      "Multiple exceptions in receiver " + streamId + "(" + exceptions.size + "):\n"
    +        exceptions.zipWithIndex.map {
    +          case (e, i) => "Exception " + i + ": " + e.getMessage + "\n" + e.getStackTraceString
    +        }.mkString("\n")
         }
    +    logInfo("Deregistering receiver " + streamId)
    +    val future = trackerActor.ask(DeregisterReceiver(streamId, message))(askTimeout)
    +    Await.result(future, askTimeout)
    +    logInfo("Deregistered receiver " + streamId)
    +    env.actorSystem.stop(actor)
    +    logInfo("Stopped receiver " + streamId)
       }
     
       /**
    -   * Stops the receiver. First it interrupts the main receiving thread,
    +   * Stop the receiver. First it interrupts the main receiving thread,
        * that is, the thread that called receiver.start(). Then it calls the user-defined
        * onStop() method to stop other threads and/or do cleanup.
        */
       def stop() {
    +    // Stop receiving by interrupting the receiving thread
         receivingThread.interrupt()
    -    onStop()
    -    //TODO: terminate the actor
    +    logInfo("Interrupted receiving thread " + receivingThread + " for stopping")
       }
     
       /**
    -   * Stops the receiver and reports exception to the tracker.
    +   * Stop the receiver and reports exception to the tracker.
        * This should be called whenever an exception is to be handled on any thread
        * of the receiver.
        */
       protected def stopOnError(e: Exception) {
         logError("Error receiving data", e)
    +    exceptions += e
         stop()
    -    actor ! ReportError(e.toString)
       }
     
    -
       /**
    -   * Pushes a block (as an ArrayBuffer filled with data) into the block manager.
    +   * Push a block (as an ArrayBuffer filled with data) into the block manager.
        */
       def pushBlock(blockId: BlockId, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) {
    +    logInfo("Block " + blockId + " has last element as " + arrayBuffer.last)
    --- End diff --
    
    Removed this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11371036
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala ---
    @@ -77,30 +82,79 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
         }
       }
     
    -  /** Stop generation of jobs */
    -  def stop() = synchronized {
    -    if (eventActor != null) {
    -      timer.stop()
    -      ssc.env.actorSystem.stop(eventActor)
    -      if (checkpointWriter != null) checkpointWriter.stop()
    -      ssc.graph.stop()
    -      logInfo("JobGenerator stopped")
    +  /**
    +   * Stop generation of jobs. processReceivedData = true makes this wait until jobs
    +   * of current ongoing time interval has been generated, processed and corresponding
    +   * checkpoints written.
    +   */
    +  def stop(processReceivedData: Boolean): Unit = synchronized {
    +    if (eventActor == null) return // generator has already been stopped
    +
    +    if (processReceivedData) {
    +      logInfo("Stopping JobGenerator gracefully")
    +      val timeWhenStopStarted = System.currentTimeMillis()
    +      val stopTimeout = 10 * ssc.graph.batchDuration.milliseconds
    +      val pollTime = 100
    +
    +      // To prevent graceful stop to get stuck permanently
    +      def hasTimedOut = {
    +        val timedOut = System.currentTimeMillis() - timeWhenStopStarted > stopTimeout
    +        if (timedOut) logWarning("Timed out while stopping the job generator")
    +        timedOut
    +      }
    +
    +      // Wait until all the received blocks in the network input tracker has
    +      // been consumed by network input DStreams, and jobs have been generated with them
    +      logInfo("Waiting for all received blocks to be consumed for job generation")
    +      while(!hasTimedOut && jobScheduler.networkInputTracker.hasMoreReceivedBlockIds) {
    +        Thread.sleep(pollTime)
    +      }
    +      logInfo("Waited for all received blocsk to be consumed for job generation")
    --- End diff --
    
    Fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-38854653
  
    Jenkins, test this again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11227580
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -430,12 +448,27 @@ class StreamingContext private[streaming] (
       /**
        * Stop the execution of the streams.
        * @param stopSparkContext Stop the associated SparkContext or not
    +   * @param stopGracefully Stop gracefully by waiting for the processing of all
    +   *                       received data to be completed
        */
    -  def stop(stopSparkContext: Boolean = true) = synchronized {
    -    scheduler.stop()
    +  def stop(
    --- End diff --
    
    This is a compile API change right? I.e. if someone compiled against the old stop() it will fail... we could fix this by implementing another function. Or maybe we just punt on it for 1.0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1331] Added graceful shutdown to Spark ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/247#issuecomment-39534741
  
    Build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---