You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Michal Čizmazia <mi...@gmail.com> on 2015/10/07 17:33:39 UTC

Graceful shutdown drops processing in Spark Streaming

After triggering the graceful shutdown on the following application, the
application stops before the windowed stream reaches its slide duration. As
a result, the data is not completely processed (i.e. saveToMyStorage is not
called) before shutdown.

According to the documentation, graceful shutdown should ensure that the
data, which has been received, is completely processed before shutdown.
https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#upgrading-application-code

Spark version: 1.4.1

Code snippet:

Function0<JavaStreamingContext> factory = () -> {
    JavaStreamingContext context = new JavaStreamingContext(sparkConf,
Durations.minutes(1));
    context.checkpoint("/test");
    JavaDStream<String> records =
context.receiverStream(myReliableReceiver).flatMap(...);
    records.persist(StorageLevel.MEMORY_AND_DISK());
    records.foreachRDD(rdd -> { rdd.count(); return null; });
    records
        .window(Durations.minutes(15), Durations.minutes(15))
        .foreachRDD(rdd -> saveToMyStorage(rdd));
    return context;
};

try (JavaStreamingContext context =
JavaStreamingContext.getOrCreate("/test", factory)) {
    context.start();
    waitForShutdownSignal();
    Boolean stopSparkContext = true;
    Boolean stopGracefully = true;
    context.stop(stopSparkContext, stopGracefully);
}

Re: Graceful shutdown drops processing in Spark Streaming

Posted by Michal Čizmazia <mi...@gmail.com>.
Thanks! Done.

https://issues.apache.org/jira/browse/SPARK-10995

On 7 October 2015 at 21:24, Tathagata Das <td...@databricks.com> wrote:

> Aaah, interesting, you are doing 15 minute slide duration. Yeah,
> internally the streaming scheduler waits for the last "batch" interval
> which has data to be processed, but if there is a sliding interval (i.e. 15
> mins) that is higher than batch interval, then that might not be run. This
> is indeed a bug and should be fixed. Mind setting up a JIRA and assigning
> it to me.
>
> On Wed, Oct 7, 2015 at 8:33 AM, Michal Čizmazia <mi...@gmail.com> wrote:
>
>> After triggering the graceful shutdown on the following application, the
>> application stops before the windowed stream reaches its slide duration. As
>> a result, the data is not completely processed (i.e. saveToMyStorage is not
>> called) before shutdown.
>>
>> According to the documentation, graceful shutdown should ensure that the
>> data, which has been received, is completely processed before shutdown.
>>
>> https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#upgrading-application-code
>>
>> Spark version: 1.4.1
>>
>> Code snippet:
>>
>> Function0<JavaStreamingContext> factory = () -> {
>>     JavaStreamingContext context = new JavaStreamingContext(sparkConf,
>> Durations.minutes(1));
>>     context.checkpoint("/test");
>>     JavaDStream<String> records =
>> context.receiverStream(myReliableReceiver).flatMap(...);
>>     records.persist(StorageLevel.MEMORY_AND_DISK());
>>     records.foreachRDD(rdd -> { rdd.count(); return null; });
>>     records
>>         .window(Durations.minutes(15), Durations.minutes(15))
>>         .foreachRDD(rdd -> saveToMyStorage(rdd));
>>     return context;
>> };
>>
>> try (JavaStreamingContext context =
>> JavaStreamingContext.getOrCreate("/test", factory)) {
>>     context.start();
>>     waitForShutdownSignal();
>>     Boolean stopSparkContext = true;
>>     Boolean stopGracefully = true;
>>     context.stop(stopSparkContext, stopGracefully);
>> }
>>
>>
>

Re: Graceful shutdown drops processing in Spark Streaming

Posted by Tathagata Das <td...@databricks.com>.
Aaah, interesting, you are doing 15 minute slide duration. Yeah, internally
the streaming scheduler waits for the last "batch" interval which has data
to be processed, but if there is a sliding interval (i.e. 15 mins) that is
higher than batch interval, then that might not be run. This is indeed a
bug and should be fixed. Mind setting up a JIRA and assigning it to me.

On Wed, Oct 7, 2015 at 8:33 AM, Michal Čizmazia <mi...@gmail.com> wrote:

> After triggering the graceful shutdown on the following application, the
> application stops before the windowed stream reaches its slide duration. As
> a result, the data is not completely processed (i.e. saveToMyStorage is not
> called) before shutdown.
>
> According to the documentation, graceful shutdown should ensure that the
> data, which has been received, is completely processed before shutdown.
>
> https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#upgrading-application-code
>
> Spark version: 1.4.1
>
> Code snippet:
>
> Function0<JavaStreamingContext> factory = () -> {
>     JavaStreamingContext context = new JavaStreamingContext(sparkConf,
> Durations.minutes(1));
>     context.checkpoint("/test");
>     JavaDStream<String> records =
> context.receiverStream(myReliableReceiver).flatMap(...);
>     records.persist(StorageLevel.MEMORY_AND_DISK());
>     records.foreachRDD(rdd -> { rdd.count(); return null; });
>     records
>         .window(Durations.minutes(15), Durations.minutes(15))
>         .foreachRDD(rdd -> saveToMyStorage(rdd));
>     return context;
> };
>
> try (JavaStreamingContext context =
> JavaStreamingContext.getOrCreate("/test", factory)) {
>     context.start();
>     waitForShutdownSignal();
>     Boolean stopSparkContext = true;
>     Boolean stopGracefully = true;
>     context.stop(stopSparkContext, stopGracefully);
> }
>
>