You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Budde, Adam" <bu...@amazon.com> on 2014/12/15 22:32:54 UTC

Stop streaming context gracefully when SIGTERM is passed

Hi all,

We are using Spark Streaming ETL a large volume of time series datasets. In our current design, each dataset we ETL will have a corresponding Spark Streaming context + process running on our cluster. Each of these processes will be passed configuration options specifying the data source to process as well as various tuning parameters such as the number of Receiver objects to use, batch interval size, number of partitions, etc.

Since the volume of data we're ingesting for each dataset will fluctuate over time, we'd like to be able to regularly send a SIGTERM to the Spark Streaming process handling the ETL, have that process gracefully complete processing any in-flight data, and restart the process with updated configuration options. The most obvious solution seems to be to call the stop(stopSparkContext: Boolean, stopGracefully: Boolean) method provided by StreamingContext in a shutdown hook, but this approach doesn't seem to be working for me. Here's a rough idea of what my code looks like:

> val ssc = new StreamingContext(conf, Seconds(15))
>
> ...
>
> // Add shutdown hook to exit gracefully upon termination.
> Runtime.getRuntime().addShutdownHook(new Thread() extends Logging {
>   override def run() = {
>     logInfo("Exiting gracefully...")
>     ssc.stop(true, true)
>   }
> })
>
> ...
>
> ssc.start()
> ssc.awaitTermination()

Whenever I try to kill the process, I don't see the "Exiting gracefully…" log message I've added. I tried grokking through the Spark source code to see if some other shutdown hook might be squashing the hook I've added by causing the process to exit before this hook is invoked, but I haven't found anything that would cause concern yet. Does anybody have any advice or insight on this? I'm a bit of a novice when it comes to the JVM and I'm afraid that I'm reaching the limits of my diagnostic abilities here.

Thanks,
Adam

Re: Stop streaming context gracefully when SIGTERM is passed

Posted by Soumitra Kumar <ku...@gmail.com>.
Hi Adam,

I have following scala actor based code to do graceful shutdown:

class TimerActor (val timeout : Long, val who : Actor) extends Actor {
    def act {
        reactWithin (timeout) {
            case TIMEOUT => who ! SHUTDOWN
        }
    }
}

class SSCReactor (val ssc : StreamingContext) extends Actor with Logging {
    def act {
        react {
            case SHUTDOWN =>
                logger.info (s"Shutting down gracefully ...")
                ssc.stop (true, true)
        }
    }
}

I see following message:

14/10/22 01:40:49 INFO SSCReactor: Shutting down gracefully ...
14/10/22 01:40:49 INFO JobGenerator: Stopping JobGenerator gracefully
14/10/22 01:40:49 INFO JobGenerator: Waiting for all received blocks to be
consumed for job generation
14/10/22 01:40:49 INFO JobGenerator: Waited for all received blocks to be
consumed for job generation

-Soumitra.


On Mon, Dec 15, 2014 at 1:32 PM, Budde, Adam <bu...@amazon.com> wrote:
>
>  Hi all,
>
>  We are using Spark Streaming ETL a large volume of time series datasets.
> In our current design, each dataset we ETL will have a corresponding Spark
> Streaming context + process running on our cluster. Each of these processes
> will be passed configuration options specifying the data source to process
> as well as various tuning parameters such as the number of Receiver objects
> to use, batch interval size, number of partitions, etc.
>
>  Since the volume of data we're ingesting for each dataset will fluctuate
> over time, we'd like to be able to regularly send a SIGTERM to the Spark
> Streaming process handling the ETL, have that process gracefully complete
> processing any in-flight data, and restart the process with updated
> configuration options. The most obvious solution seems to be to call the
> stop(stopSparkContext: Boolean, stopGracefully: Boolean) method provided
> by StreamingContext in a shutdown hook, but this approach doesn't seem to
> be working for me. Here's a rough idea of what my code looks like:
>
>  > val ssc = new StreamingContext(conf, Seconds(15))
> >
> > ...
> >
> > // Add shutdown hook to exit gracefully upon termination.
> > Runtime.getRuntime().addShutdownHook(new Thread() extends Logging {
> >   override def run() = {
> >     logInfo("Exiting gracefully...")
> >     ssc.stop(true, true)
> >   }
> > })
> >
> > ...
> >
> > ssc.start()
> > ssc.awaitTermination()
>
>  Whenever I try to kill the process, I don't see the "Exiting
> gracefully…" log message I've added. I tried grokking through the Spark
> source code to see if some other shutdown hook might be squashing the hook
> I've added by causing the process to exit before this hook is invoked, but
> I haven't found anything that would cause concern yet. Does anybody have
> any advice or insight on this? I'm a bit of a novice when it comes to the
> JVM and I'm afraid that I'm reaching the limits of my diagnostic abilities
> here.
>
>  Thanks,
> Adam
>