You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Shushant Arora <sh...@gmail.com> on 2015/08/07 05:26:20 UTC

stopping spark stream app

Hi

I am using spark stream 1.3 and using custom checkpoint to save kafka
offsets.

1.Is doing
Runtime.getRuntime().addShutdownHook(new Thread() {
  @Override
  public void run() {
  jssc.stop(true, true);
   System.out.println("Inside Add Shutdown Hook");
  }
 });

to handle stop is safe ?

2.And I need to handle saving checkoinnt in shutdown hook also or driver
will handle it automatically since it grcaefully stops stream and handle
completion of foreachRDD function on stream ?
directKafkaStream.foreachRDD(new Function<JavaRDD<byte[][]>, Void>() {
}

Thanks

Re: stopping spark stream app

Posted by agateaaa <ag...@gmail.com>.
Hi,

We recently started working on trying to use spark streaming to fetch and
process data from kafka. (Direct Streaming, Not Receiver based Spark 1.5.2)
We want to be able to stop the streaming application and tried implementing
the approach suggested above, using stopping thread and calling
ssc.stop(True,True) so that we dont lose any data that is being processed.
We are seeing that the spark application tries to shutdown but never exits.

16/01/29 18:14:47 INFO scheduler.JobGenerator: Stopping JobGenerator
gracefully
16/01/29 18:14:47 INFO scheduler.JobGenerator: Waiting for all received
blocks to be consumed for job generation
16/01/29 18:14:47 INFO scheduler.JobGenerator: Waited for all received
blocks to be consumed for job generation

[...]

92.168.10.4:38821 in memory (size: 3.3 KB, free: 530.3 MB)
16/01/29 18:20:11 INFO spark.ContextCleaner: Cleaned accumulator 6
16/01/29 18:20:11 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0
on 192.168.10.4:38821 in memory (size: 3.3 KB, free: 530.3 MB)
16/01/29 18:20:11 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0
on 192.168.10.5:46085 in memory (size: 3.3 KB, free: 530.3 MB)
16/01/29 18:20:11 INFO spark.ContextCleaner: Cleaned accumulator 5
16/01/29 18:20:11 INFO storage.BlockManagerInfo: Removed broadcast_2_piece0
on 192.168.10.4:38821 in memory (size: 5.6 KB, free: 530.3 MB)
16/01/29 18:20:11 INFO storage.BlockManagerInfo: Removed broadcast_2_piece0
on 192.168.10.4:41701 in memory (size: 5.6 KB, free: 530.3 MB)
16/01/29 18:20:11 INFO spark.ContextCleaner: Cleaned accumulator 4
16/01/29 18:20:11 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0
on 192.168.10.4:38821 in memory (size: 3.3 KB, free: 530.3 MB)
16/01/29 18:20:11 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0
on 192.168.10.4:41701 in memory (size: 3.3 KB, free: 530.3 MB)
16/01/29 18:20:11 INFO spark.ContextCleaner: Cleaned accumulator 3
16/01/29 18:20:11 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0
on 192.168.10.4:38821 in memory (size: 3.3 KB, free: 530.3 MB)
16/01/29 18:20:11 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0
on 192.168.10.4:41701 in memory (size: 3.3 KB, free: 530.3 MB)
16/01/29 18:20:11 INFO spark.ContextCleaner: Cleaned accumulator 2

At this point we had to kill the spark app and kill the driver and
spark-submit process.

Will appreciate if anyone can give any pointers on how to shutdown a
streaming app

Thanks
Agatea



On Wed, Aug 12, 2015 at 7:54 PM, Tathagata Das <td...@databricks.com> wrote:

> stop() is a blocking method when stopGraceful is set to true. In that
> case, it obviously waits for all batches with data to complete processing.
> Why are you joining on the thread in streaming listener? The listener is
> just a callback listener and is NOT supposed to do any long running
> blocking stuff.
> If you intention is that you will call stop() just in
> listener.onBatchCompleted to prevent the next batch from starting, that is
> WRONG. The listener is issued callbacks asynchronous to the processing loop
> of the context.
> As I said earlier, the ssc.stop() does not need to be (and in fact, most
> cases, should not be) called from the listener. It should be called from
> some other thread. If you have to make sure that the main program waits for
> stop to complete (especially in the case of graceful stop), then make the
> main program thread wait for stopping-thread.join(). Under no circumstances
> should you do blocking calls in the listener events.
>
> On Wed, Aug 12, 2015 at 12:13 PM, Shushant Arora <
> shushantarora09@gmail.com> wrote:
>
>> does streamingcontext.stop() is a blocking method? I mean does it wait
>> for all the batches completion and complete of all streaminglisteners .
>> Since it may happen in new thread by the time sc.stop() is called a new
>> batch is already started beacause of race condition.So it will wait for new
>> batch completion also.
>>
>> I was actually joining the streaming listener to new thread which caused
>> the deadlock - since sc.stop() is blocking and it wait for all streaming
>> listeners to complete also - right?
>>
>> On Thu, Aug 13, 2015 at 12:33 AM, Tathagata Das <td...@databricks.com>
>> wrote:
>>
>>> Well, system.exit will not ensure all data was processed before
>>> shutdown.
>>> There should not be a deadlock is onBatchCompleted just starts the
>>> thread (that runs stop()) and completes.
>>>
>>> On Wed, Aug 12, 2015 at 1:50 AM, Shushant Arora <
>>> shushantarora09@gmail.com> wrote:
>>>
>>>> calling jssc.stop(false/true,false/true) from streamingListener causes
>>>> deadlock , So I created another thread and called jssc.stop from  that but
>>>> that too caused deadlock if onBatchCompleted is not completed before
>>>> jssc.stop().
>>>>
>>>> So is it safe If I call System.exit(1) from another thread without
>>>> calling jssc.stop()- since that leads to deadlock.
>>>>
>>>>
>>>> On Tue, Aug 11, 2015 at 9:54 PM, Shushant Arora <
>>>> shushantarora09@gmail.com> wrote:
>>>>
>>>>> Is stopping in the streaming context in onBatchCompleted event
>>>>> of StreamingListener does not kill the app?
>>>>>
>>>>> I have below code in streaming listener
>>>>>
>>>>> public void onBatchCompleted(StreamingListenerBatchCompleted arg0) {
>>>>> //check stop condition
>>>>> System.out.println("stopping gracefully");
>>>>> jssc.stop(false,false);
>>>>> System.out.println("stopped gracefully");
>>>>> }
>>>>>
>>>>> stopped gracefully is never printed.
>>>>>
>>>>> On UI no more batches are processed but application is never
>>>>> killed/stopped? Whats the best way to kill the app.after stopping context?
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Aug 11, 2015 at 2:55 AM, Shushant Arora <
>>>>> shushantarora09@gmail.com> wrote:
>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Aug 11, 2015 at 1:34 AM, Tathagata Das <td...@databricks.com>
>>>>>> wrote:
>>>>>>
>>>>>>> 1. RPC can be done in many ways, and a web service is one of many
>>>>>>> ways. A even more hacky version can be the app polling a file in a file
>>>>>>> system, if the file exists start shutting down.
>>>>>>> 2. No need to set a flag. When you get the signal from RPC, you can
>>>>>>> just call context.stop(stopGracefully = true) . Though note that this is
>>>>>>> blocking, so gotta be carefully about doing blocking calls on the RPC
>>>>>>> thread.
>>>>>>>
>>>>>>> On Mon, Aug 10, 2015 at 12:24 PM, Shushant Arora <
>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>
>>>>>>>> By RPC you mean web service exposed on driver which listens and set
>>>>>>>> some flag and driver checks that flag at end of each batch and if set then
>>>>>>>> gracefully stop the application ?
>>>>>>>>
>>>>>>>> On Tue, Aug 11, 2015 at 12:43 AM, Tathagata Das <
>>>>>>>> tdas@databricks.com> wrote:
>>>>>>>>
>>>>>>>>> In general, it is a little risky to put long running stuff in a
>>>>>>>>> shutdown hook as it may delay shutdown of the process which may delay other
>>>>>>>>> things. That said, you could try it out.
>>>>>>>>>
>>>>>>>>> A better way to explicitly shutdown gracefully is to use an RPC to
>>>>>>>>> signal the driver process to start shutting down, and then the process will
>>>>>>>>> gracefully stop the context and terminate. This is more robust that than
>>>>>>>>> leveraging shutdown hooks.
>>>>>>>>>
>>>>>>>>> On Mon, Aug 10, 2015 at 11:56 AM, Shushant Arora <
>>>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Any help in best recommendation for gracefully shutting down a
>>>>>>>>>> spark stream application ?
>>>>>>>>>> I am running it on yarn and a way to tell from externally either
>>>>>>>>>> yarn application -kill command or some other way but need current batch to
>>>>>>>>>> be processed completely and checkpoint to be saved before shutting down.
>>>>>>>>>>
>>>>>>>>>> Runtime.getRuntime().addShutdownHook does not seem to be working.
>>>>>>>>>> Yarn kills the application immediately and dooes not call shutdown hook
>>>>>>>>>> call back .
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sun, Aug 9, 2015 at 12:45 PM, Shushant Arora <
>>>>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi
>>>>>>>>>>>
>>>>>>>>>>> How to ensure in spark streaming 1.3 with kafka that when an
>>>>>>>>>>> application is killed , last running batch is fully processed and offsets
>>>>>>>>>>> are written to checkpointing dir.
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Aug 7, 2015 at 8:56 AM, Shushant Arora <
>>>>>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi
>>>>>>>>>>>>
>>>>>>>>>>>> I am using spark stream 1.3 and using custom checkpoint to save
>>>>>>>>>>>> kafka offsets.
>>>>>>>>>>>>
>>>>>>>>>>>> 1.Is doing
>>>>>>>>>>>> Runtime.getRuntime().addShutdownHook(new Thread() {
>>>>>>>>>>>>   @Override
>>>>>>>>>>>>   public void run() {
>>>>>>>>>>>>   jssc.stop(true, true);
>>>>>>>>>>>>    System.out.println("Inside Add Shutdown Hook");
>>>>>>>>>>>>   }
>>>>>>>>>>>>  });
>>>>>>>>>>>>
>>>>>>>>>>>> to handle stop is safe ?
>>>>>>>>>>>>
>>>>>>>>>>>> 2.And I need to handle saving checkoinnt in shutdown hook also
>>>>>>>>>>>> or driver will handle it automatically since it grcaefully stops stream and
>>>>>>>>>>>> handle
>>>>>>>>>>>> completion of foreachRDD function on stream ?
>>>>>>>>>>>> directKafkaStream.foreachRDD(new Function<JavaRDD<byte[][]>,
>>>>>>>>>>>> Void>() {
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: stopping spark stream app

Posted by Tathagata Das <td...@databricks.com>.
stop() is a blocking method when stopGraceful is set to true. In that case,
it obviously waits for all batches with data to complete processing.
Why are you joining on the thread in streaming listener? The listener is
just a callback listener and is NOT supposed to do any long running
blocking stuff.
If you intention is that you will call stop() just in
listener.onBatchCompleted to prevent the next batch from starting, that is
WRONG. The listener is issued callbacks asynchronous to the processing loop
of the context.
As I said earlier, the ssc.stop() does not need to be (and in fact, most
cases, should not be) called from the listener. It should be called from
some other thread. If you have to make sure that the main program waits for
stop to complete (especially in the case of graceful stop), then make the
main program thread wait for stopping-thread.join(). Under no circumstances
should you do blocking calls in the listener events.

On Wed, Aug 12, 2015 at 12:13 PM, Shushant Arora <sh...@gmail.com>
wrote:

> does streamingcontext.stop() is a blocking method? I mean does it wait for
> all the batches completion and complete of all streaminglisteners . Since
> it may happen in new thread by the time sc.stop() is called a new batch is
> already started beacause of race condition.So it will wait for new batch
> completion also.
>
> I was actually joining the streaming listener to new thread which caused
> the deadlock - since sc.stop() is blocking and it wait for all streaming
> listeners to complete also - right?
>
> On Thu, Aug 13, 2015 at 12:33 AM, Tathagata Das <td...@databricks.com>
> wrote:
>
>> Well, system.exit will not ensure all data was processed before shutdown.
>> There should not be a deadlock is onBatchCompleted just starts the thread
>> (that runs stop()) and completes.
>>
>> On Wed, Aug 12, 2015 at 1:50 AM, Shushant Arora <
>> shushantarora09@gmail.com> wrote:
>>
>>> calling jssc.stop(false/true,false/true) from streamingListener causes
>>> deadlock , So I created another thread and called jssc.stop from  that but
>>> that too caused deadlock if onBatchCompleted is not completed before
>>> jssc.stop().
>>>
>>> So is it safe If I call System.exit(1) from another thread without
>>> calling jssc.stop()- since that leads to deadlock.
>>>
>>>
>>> On Tue, Aug 11, 2015 at 9:54 PM, Shushant Arora <
>>> shushantarora09@gmail.com> wrote:
>>>
>>>> Is stopping in the streaming context in onBatchCompleted event
>>>> of StreamingListener does not kill the app?
>>>>
>>>> I have below code in streaming listener
>>>>
>>>> public void onBatchCompleted(StreamingListenerBatchCompleted arg0) {
>>>> //check stop condition
>>>> System.out.println("stopping gracefully");
>>>> jssc.stop(false,false);
>>>> System.out.println("stopped gracefully");
>>>> }
>>>>
>>>> stopped gracefully is never printed.
>>>>
>>>> On UI no more batches are processed but application is never
>>>> killed/stopped? Whats the best way to kill the app.after stopping context?
>>>>
>>>>
>>>>
>>>> On Tue, Aug 11, 2015 at 2:55 AM, Shushant Arora <
>>>> shushantarora09@gmail.com> wrote:
>>>>
>>>>> Thanks!
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Aug 11, 2015 at 1:34 AM, Tathagata Das <td...@databricks.com>
>>>>> wrote:
>>>>>
>>>>>> 1. RPC can be done in many ways, and a web service is one of many
>>>>>> ways. A even more hacky version can be the app polling a file in a file
>>>>>> system, if the file exists start shutting down.
>>>>>> 2. No need to set a flag. When you get the signal from RPC, you can
>>>>>> just call context.stop(stopGracefully = true) . Though note that this is
>>>>>> blocking, so gotta be carefully about doing blocking calls on the RPC
>>>>>> thread.
>>>>>>
>>>>>> On Mon, Aug 10, 2015 at 12:24 PM, Shushant Arora <
>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>
>>>>>>> By RPC you mean web service exposed on driver which listens and set
>>>>>>> some flag and driver checks that flag at end of each batch and if set then
>>>>>>> gracefully stop the application ?
>>>>>>>
>>>>>>> On Tue, Aug 11, 2015 at 12:43 AM, Tathagata Das <tdas@databricks.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> In general, it is a little risky to put long running stuff in a
>>>>>>>> shutdown hook as it may delay shutdown of the process which may delay other
>>>>>>>> things. That said, you could try it out.
>>>>>>>>
>>>>>>>> A better way to explicitly shutdown gracefully is to use an RPC to
>>>>>>>> signal the driver process to start shutting down, and then the process will
>>>>>>>> gracefully stop the context and terminate. This is more robust that than
>>>>>>>> leveraging shutdown hooks.
>>>>>>>>
>>>>>>>> On Mon, Aug 10, 2015 at 11:56 AM, Shushant Arora <
>>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Any help in best recommendation for gracefully shutting down a
>>>>>>>>> spark stream application ?
>>>>>>>>> I am running it on yarn and a way to tell from externally either
>>>>>>>>> yarn application -kill command or some other way but need current batch to
>>>>>>>>> be processed completely and checkpoint to be saved before shutting down.
>>>>>>>>>
>>>>>>>>> Runtime.getRuntime().addShutdownHook does not seem to be working.
>>>>>>>>> Yarn kills the application immediately and dooes not call shutdown hook
>>>>>>>>> call back .
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sun, Aug 9, 2015 at 12:45 PM, Shushant Arora <
>>>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi
>>>>>>>>>>
>>>>>>>>>> How to ensure in spark streaming 1.3 with kafka that when an
>>>>>>>>>> application is killed , last running batch is fully processed and offsets
>>>>>>>>>> are written to checkpointing dir.
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 7, 2015 at 8:56 AM, Shushant Arora <
>>>>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi
>>>>>>>>>>>
>>>>>>>>>>> I am using spark stream 1.3 and using custom checkpoint to save
>>>>>>>>>>> kafka offsets.
>>>>>>>>>>>
>>>>>>>>>>> 1.Is doing
>>>>>>>>>>> Runtime.getRuntime().addShutdownHook(new Thread() {
>>>>>>>>>>>   @Override
>>>>>>>>>>>   public void run() {
>>>>>>>>>>>   jssc.stop(true, true);
>>>>>>>>>>>    System.out.println("Inside Add Shutdown Hook");
>>>>>>>>>>>   }
>>>>>>>>>>>  });
>>>>>>>>>>>
>>>>>>>>>>> to handle stop is safe ?
>>>>>>>>>>>
>>>>>>>>>>> 2.And I need to handle saving checkoinnt in shutdown hook also
>>>>>>>>>>> or driver will handle it automatically since it grcaefully stops stream and
>>>>>>>>>>> handle
>>>>>>>>>>> completion of foreachRDD function on stream ?
>>>>>>>>>>> directKafkaStream.foreachRDD(new Function<JavaRDD<byte[][]>,
>>>>>>>>>>> Void>() {
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: stopping spark stream app

Posted by Tathagata Das <td...@databricks.com>.
Well, system.exit will not ensure all data was processed before shutdown.
There should not be a deadlock is onBatchCompleted just starts the thread
(that runs stop()) and completes.

On Wed, Aug 12, 2015 at 1:50 AM, Shushant Arora <sh...@gmail.com>
wrote:

> calling jssc.stop(false/true,false/true) from streamingListener causes
> deadlock , So I created another thread and called jssc.stop from  that but
> that too caused deadlock if onBatchCompleted is not completed before
> jssc.stop().
>
> So is it safe If I call System.exit(1) from another thread without calling
> jssc.stop()- since that leads to deadlock.
>
>
> On Tue, Aug 11, 2015 at 9:54 PM, Shushant Arora <shushantarora09@gmail.com
> > wrote:
>
>> Is stopping in the streaming context in onBatchCompleted event
>> of StreamingListener does not kill the app?
>>
>> I have below code in streaming listener
>>
>> public void onBatchCompleted(StreamingListenerBatchCompleted arg0) {
>> //check stop condition
>> System.out.println("stopping gracefully");
>> jssc.stop(false,false);
>> System.out.println("stopped gracefully");
>> }
>>
>> stopped gracefully is never printed.
>>
>> On UI no more batches are processed but application is never
>> killed/stopped? Whats the best way to kill the app.after stopping context?
>>
>>
>>
>> On Tue, Aug 11, 2015 at 2:55 AM, Shushant Arora <
>> shushantarora09@gmail.com> wrote:
>>
>>> Thanks!
>>>
>>>
>>>
>>> On Tue, Aug 11, 2015 at 1:34 AM, Tathagata Das <td...@databricks.com>
>>> wrote:
>>>
>>>> 1. RPC can be done in many ways, and a web service is one of many ways.
>>>> A even more hacky version can be the app polling a file in a file system,
>>>> if the file exists start shutting down.
>>>> 2. No need to set a flag. When you get the signal from RPC, you can
>>>> just call context.stop(stopGracefully = true) . Though note that this is
>>>> blocking, so gotta be carefully about doing blocking calls on the RPC
>>>> thread.
>>>>
>>>> On Mon, Aug 10, 2015 at 12:24 PM, Shushant Arora <
>>>> shushantarora09@gmail.com> wrote:
>>>>
>>>>> By RPC you mean web service exposed on driver which listens and set
>>>>> some flag and driver checks that flag at end of each batch and if set then
>>>>> gracefully stop the application ?
>>>>>
>>>>> On Tue, Aug 11, 2015 at 12:43 AM, Tathagata Das <td...@databricks.com>
>>>>> wrote:
>>>>>
>>>>>> In general, it is a little risky to put long running stuff in a
>>>>>> shutdown hook as it may delay shutdown of the process which may delay other
>>>>>> things. That said, you could try it out.
>>>>>>
>>>>>> A better way to explicitly shutdown gracefully is to use an RPC to
>>>>>> signal the driver process to start shutting down, and then the process will
>>>>>> gracefully stop the context and terminate. This is more robust that than
>>>>>> leveraging shutdown hooks.
>>>>>>
>>>>>> On Mon, Aug 10, 2015 at 11:56 AM, Shushant Arora <
>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>
>>>>>>> Any help in best recommendation for gracefully shutting down a spark
>>>>>>> stream application ?
>>>>>>> I am running it on yarn and a way to tell from externally either
>>>>>>> yarn application -kill command or some other way but need current batch to
>>>>>>> be processed completely and checkpoint to be saved before shutting down.
>>>>>>>
>>>>>>> Runtime.getRuntime().addShutdownHook does not seem to be working.
>>>>>>> Yarn kills the application immediately and dooes not call shutdown hook
>>>>>>> call back .
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Aug 9, 2015 at 12:45 PM, Shushant Arora <
>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi
>>>>>>>>
>>>>>>>> How to ensure in spark streaming 1.3 with kafka that when an
>>>>>>>> application is killed , last running batch is fully processed and offsets
>>>>>>>> are written to checkpointing dir.
>>>>>>>>
>>>>>>>> On Fri, Aug 7, 2015 at 8:56 AM, Shushant Arora <
>>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi
>>>>>>>>>
>>>>>>>>> I am using spark stream 1.3 and using custom checkpoint to save
>>>>>>>>> kafka offsets.
>>>>>>>>>
>>>>>>>>> 1.Is doing
>>>>>>>>> Runtime.getRuntime().addShutdownHook(new Thread() {
>>>>>>>>>   @Override
>>>>>>>>>   public void run() {
>>>>>>>>>   jssc.stop(true, true);
>>>>>>>>>    System.out.println("Inside Add Shutdown Hook");
>>>>>>>>>   }
>>>>>>>>>  });
>>>>>>>>>
>>>>>>>>> to handle stop is safe ?
>>>>>>>>>
>>>>>>>>> 2.And I need to handle saving checkoinnt in shutdown hook also or
>>>>>>>>> driver will handle it automatically since it grcaefully stops stream and
>>>>>>>>> handle
>>>>>>>>> completion of foreachRDD function on stream ?
>>>>>>>>> directKafkaStream.foreachRDD(new Function<JavaRDD<byte[][]>,
>>>>>>>>> Void>() {
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: stopping spark stream app

Posted by Shushant Arora <sh...@gmail.com>.
calling jssc.stop(false/true,false/true) from streamingListener causes
deadlock , So I created another thread and called jssc.stop from  that but
that too caused deadlock if onBatchCompleted is not completed before
jssc.stop().

So is it safe If I call System.exit(1) from another thread without calling
jssc.stop()- since that leads to deadlock.


On Tue, Aug 11, 2015 at 9:54 PM, Shushant Arora <sh...@gmail.com>
wrote:

> Is stopping in the streaming context in onBatchCompleted event
> of StreamingListener does not kill the app?
>
> I have below code in streaming listener
>
> public void onBatchCompleted(StreamingListenerBatchCompleted arg0) {
> //check stop condition
> System.out.println("stopping gracefully");
> jssc.stop(false,false);
> System.out.println("stopped gracefully");
> }
>
> stopped gracefully is never printed.
>
> On UI no more batches are processed but application is never
> killed/stopped? Whats the best way to kill the app.after stopping context?
>
>
>
> On Tue, Aug 11, 2015 at 2:55 AM, Shushant Arora <shushantarora09@gmail.com
> > wrote:
>
>> Thanks!
>>
>>
>>
>> On Tue, Aug 11, 2015 at 1:34 AM, Tathagata Das <td...@databricks.com>
>> wrote:
>>
>>> 1. RPC can be done in many ways, and a web service is one of many ways.
>>> A even more hacky version can be the app polling a file in a file system,
>>> if the file exists start shutting down.
>>> 2. No need to set a flag. When you get the signal from RPC, you can just
>>> call context.stop(stopGracefully = true) . Though note that this is
>>> blocking, so gotta be carefully about doing blocking calls on the RPC
>>> thread.
>>>
>>> On Mon, Aug 10, 2015 at 12:24 PM, Shushant Arora <
>>> shushantarora09@gmail.com> wrote:
>>>
>>>> By RPC you mean web service exposed on driver which listens and set
>>>> some flag and driver checks that flag at end of each batch and if set then
>>>> gracefully stop the application ?
>>>>
>>>> On Tue, Aug 11, 2015 at 12:43 AM, Tathagata Das <td...@databricks.com>
>>>> wrote:
>>>>
>>>>> In general, it is a little risky to put long running stuff in a
>>>>> shutdown hook as it may delay shutdown of the process which may delay other
>>>>> things. That said, you could try it out.
>>>>>
>>>>> A better way to explicitly shutdown gracefully is to use an RPC to
>>>>> signal the driver process to start shutting down, and then the process will
>>>>> gracefully stop the context and terminate. This is more robust that than
>>>>> leveraging shutdown hooks.
>>>>>
>>>>> On Mon, Aug 10, 2015 at 11:56 AM, Shushant Arora <
>>>>> shushantarora09@gmail.com> wrote:
>>>>>
>>>>>> Any help in best recommendation for gracefully shutting down a spark
>>>>>> stream application ?
>>>>>> I am running it on yarn and a way to tell from externally either yarn
>>>>>> application -kill command or some other way but need current batch to be
>>>>>> processed completely and checkpoint to be saved before shutting down.
>>>>>>
>>>>>> Runtime.getRuntime().addShutdownHook does not seem to be working.
>>>>>> Yarn kills the application immediately and dooes not call shutdown hook
>>>>>> call back .
>>>>>>
>>>>>>
>>>>>> On Sun, Aug 9, 2015 at 12:45 PM, Shushant Arora <
>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>
>>>>>>> Hi
>>>>>>>
>>>>>>> How to ensure in spark streaming 1.3 with kafka that when an
>>>>>>> application is killed , last running batch is fully processed and offsets
>>>>>>> are written to checkpointing dir.
>>>>>>>
>>>>>>> On Fri, Aug 7, 2015 at 8:56 AM, Shushant Arora <
>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi
>>>>>>>>
>>>>>>>> I am using spark stream 1.3 and using custom checkpoint to save
>>>>>>>> kafka offsets.
>>>>>>>>
>>>>>>>> 1.Is doing
>>>>>>>> Runtime.getRuntime().addShutdownHook(new Thread() {
>>>>>>>>   @Override
>>>>>>>>   public void run() {
>>>>>>>>   jssc.stop(true, true);
>>>>>>>>    System.out.println("Inside Add Shutdown Hook");
>>>>>>>>   }
>>>>>>>>  });
>>>>>>>>
>>>>>>>> to handle stop is safe ?
>>>>>>>>
>>>>>>>> 2.And I need to handle saving checkoinnt in shutdown hook also or
>>>>>>>> driver will handle it automatically since it grcaefully stops stream and
>>>>>>>> handle
>>>>>>>> completion of foreachRDD function on stream ?
>>>>>>>> directKafkaStream.foreachRDD(new Function<JavaRDD<byte[][]>,
>>>>>>>> Void>() {
>>>>>>>> }
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: stopping spark stream app

Posted by Shushant Arora <sh...@gmail.com>.
Is stopping in the streaming context in onBatchCompleted event
of StreamingListener does not kill the app?

I have below code in streaming listener

public void onBatchCompleted(StreamingListenerBatchCompleted arg0) {
//check stop condition
System.out.println("stopping gracefully");
jssc.stop(false,false);
System.out.println("stopped gracefully");
}

stopped gracefully is never printed.

On UI no more batches are processed but application is never
killed/stopped? Whats the best way to kill the app.after stopping context?



On Tue, Aug 11, 2015 at 2:55 AM, Shushant Arora <sh...@gmail.com>
wrote:

> Thanks!
>
>
>
> On Tue, Aug 11, 2015 at 1:34 AM, Tathagata Das <td...@databricks.com>
> wrote:
>
>> 1. RPC can be done in many ways, and a web service is one of many ways. A
>> even more hacky version can be the app polling a file in a file system, if
>> the file exists start shutting down.
>> 2. No need to set a flag. When you get the signal from RPC, you can just
>> call context.stop(stopGracefully = true) . Though note that this is
>> blocking, so gotta be carefully about doing blocking calls on the RPC
>> thread.
>>
>> On Mon, Aug 10, 2015 at 12:24 PM, Shushant Arora <
>> shushantarora09@gmail.com> wrote:
>>
>>> By RPC you mean web service exposed on driver which listens and set some
>>> flag and driver checks that flag at end of each batch and if set then
>>> gracefully stop the application ?
>>>
>>> On Tue, Aug 11, 2015 at 12:43 AM, Tathagata Das <td...@databricks.com>
>>> wrote:
>>>
>>>> In general, it is a little risky to put long running stuff in a
>>>> shutdown hook as it may delay shutdown of the process which may delay other
>>>> things. That said, you could try it out.
>>>>
>>>> A better way to explicitly shutdown gracefully is to use an RPC to
>>>> signal the driver process to start shutting down, and then the process will
>>>> gracefully stop the context and terminate. This is more robust that than
>>>> leveraging shutdown hooks.
>>>>
>>>> On Mon, Aug 10, 2015 at 11:56 AM, Shushant Arora <
>>>> shushantarora09@gmail.com> wrote:
>>>>
>>>>> Any help in best recommendation for gracefully shutting down a spark
>>>>> stream application ?
>>>>> I am running it on yarn and a way to tell from externally either yarn
>>>>> application -kill command or some other way but need current batch to be
>>>>> processed completely and checkpoint to be saved before shutting down.
>>>>>
>>>>> Runtime.getRuntime().addShutdownHook does not seem to be working. Yarn
>>>>> kills the application immediately and dooes not call shutdown hook call
>>>>> back .
>>>>>
>>>>>
>>>>> On Sun, Aug 9, 2015 at 12:45 PM, Shushant Arora <
>>>>> shushantarora09@gmail.com> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> How to ensure in spark streaming 1.3 with kafka that when an
>>>>>> application is killed , last running batch is fully processed and offsets
>>>>>> are written to checkpointing dir.
>>>>>>
>>>>>> On Fri, Aug 7, 2015 at 8:56 AM, Shushant Arora <
>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>
>>>>>>> Hi
>>>>>>>
>>>>>>> I am using spark stream 1.3 and using custom checkpoint to save
>>>>>>> kafka offsets.
>>>>>>>
>>>>>>> 1.Is doing
>>>>>>> Runtime.getRuntime().addShutdownHook(new Thread() {
>>>>>>>   @Override
>>>>>>>   public void run() {
>>>>>>>   jssc.stop(true, true);
>>>>>>>    System.out.println("Inside Add Shutdown Hook");
>>>>>>>   }
>>>>>>>  });
>>>>>>>
>>>>>>> to handle stop is safe ?
>>>>>>>
>>>>>>> 2.And I need to handle saving checkoinnt in shutdown hook also or
>>>>>>> driver will handle it automatically since it grcaefully stops stream and
>>>>>>> handle
>>>>>>> completion of foreachRDD function on stream ?
>>>>>>> directKafkaStream.foreachRDD(new Function<JavaRDD<byte[][]>, Void>()
>>>>>>> {
>>>>>>> }
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: stopping spark stream app

Posted by Shushant Arora <sh...@gmail.com>.
Thanks!



On Tue, Aug 11, 2015 at 1:34 AM, Tathagata Das <td...@databricks.com> wrote:

> 1. RPC can be done in many ways, and a web service is one of many ways. A
> even more hacky version can be the app polling a file in a file system, if
> the file exists start shutting down.
> 2. No need to set a flag. When you get the signal from RPC, you can just
> call context.stop(stopGracefully = true) . Though note that this is
> blocking, so gotta be carefully about doing blocking calls on the RPC
> thread.
>
> On Mon, Aug 10, 2015 at 12:24 PM, Shushant Arora <
> shushantarora09@gmail.com> wrote:
>
>> By RPC you mean web service exposed on driver which listens and set some
>> flag and driver checks that flag at end of each batch and if set then
>> gracefully stop the application ?
>>
>> On Tue, Aug 11, 2015 at 12:43 AM, Tathagata Das <td...@databricks.com>
>> wrote:
>>
>>> In general, it is a little risky to put long running stuff in a shutdown
>>> hook as it may delay shutdown of the process which may delay other things.
>>> That said, you could try it out.
>>>
>>> A better way to explicitly shutdown gracefully is to use an RPC to
>>> signal the driver process to start shutting down, and then the process will
>>> gracefully stop the context and terminate. This is more robust that than
>>> leveraging shutdown hooks.
>>>
>>> On Mon, Aug 10, 2015 at 11:56 AM, Shushant Arora <
>>> shushantarora09@gmail.com> wrote:
>>>
>>>> Any help in best recommendation for gracefully shutting down a spark
>>>> stream application ?
>>>> I am running it on yarn and a way to tell from externally either yarn
>>>> application -kill command or some other way but need current batch to be
>>>> processed completely and checkpoint to be saved before shutting down.
>>>>
>>>> Runtime.getRuntime().addShutdownHook does not seem to be working. Yarn
>>>> kills the application immediately and dooes not call shutdown hook call
>>>> back .
>>>>
>>>>
>>>> On Sun, Aug 9, 2015 at 12:45 PM, Shushant Arora <
>>>> shushantarora09@gmail.com> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> How to ensure in spark streaming 1.3 with kafka that when an
>>>>> application is killed , last running batch is fully processed and offsets
>>>>> are written to checkpointing dir.
>>>>>
>>>>> On Fri, Aug 7, 2015 at 8:56 AM, Shushant Arora <
>>>>> shushantarora09@gmail.com> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> I am using spark stream 1.3 and using custom checkpoint to save kafka
>>>>>> offsets.
>>>>>>
>>>>>> 1.Is doing
>>>>>> Runtime.getRuntime().addShutdownHook(new Thread() {
>>>>>>   @Override
>>>>>>   public void run() {
>>>>>>   jssc.stop(true, true);
>>>>>>    System.out.println("Inside Add Shutdown Hook");
>>>>>>   }
>>>>>>  });
>>>>>>
>>>>>> to handle stop is safe ?
>>>>>>
>>>>>> 2.And I need to handle saving checkoinnt in shutdown hook also or
>>>>>> driver will handle it automatically since it grcaefully stops stream and
>>>>>> handle
>>>>>> completion of foreachRDD function on stream ?
>>>>>> directKafkaStream.foreachRDD(new Function<JavaRDD<byte[][]>, Void>() {
>>>>>> }
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: stopping spark stream app

Posted by Tathagata Das <td...@databricks.com>.
1. RPC can be done in many ways, and a web service is one of many ways. A
even more hacky version can be the app polling a file in a file system, if
the file exists start shutting down.
2. No need to set a flag. When you get the signal from RPC, you can just
call context.stop(stopGracefully = true) . Though note that this is
blocking, so gotta be carefully about doing blocking calls on the RPC
thread.

On Mon, Aug 10, 2015 at 12:24 PM, Shushant Arora <sh...@gmail.com>
wrote:

> By RPC you mean web service exposed on driver which listens and set some
> flag and driver checks that flag at end of each batch and if set then
> gracefully stop the application ?
>
> On Tue, Aug 11, 2015 at 12:43 AM, Tathagata Das <td...@databricks.com>
> wrote:
>
>> In general, it is a little risky to put long running stuff in a shutdown
>> hook as it may delay shutdown of the process which may delay other things.
>> That said, you could try it out.
>>
>> A better way to explicitly shutdown gracefully is to use an RPC to signal
>> the driver process to start shutting down, and then the process will
>> gracefully stop the context and terminate. This is more robust that than
>> leveraging shutdown hooks.
>>
>> On Mon, Aug 10, 2015 at 11:56 AM, Shushant Arora <
>> shushantarora09@gmail.com> wrote:
>>
>>> Any help in best recommendation for gracefully shutting down a spark
>>> stream application ?
>>> I am running it on yarn and a way to tell from externally either yarn
>>> application -kill command or some other way but need current batch to be
>>> processed completely and checkpoint to be saved before shutting down.
>>>
>>> Runtime.getRuntime().addShutdownHook does not seem to be working. Yarn
>>> kills the application immediately and dooes not call shutdown hook call
>>> back .
>>>
>>>
>>> On Sun, Aug 9, 2015 at 12:45 PM, Shushant Arora <
>>> shushantarora09@gmail.com> wrote:
>>>
>>>> Hi
>>>>
>>>> How to ensure in spark streaming 1.3 with kafka that when an
>>>> application is killed , last running batch is fully processed and offsets
>>>> are written to checkpointing dir.
>>>>
>>>> On Fri, Aug 7, 2015 at 8:56 AM, Shushant Arora <
>>>> shushantarora09@gmail.com> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> I am using spark stream 1.3 and using custom checkpoint to save kafka
>>>>> offsets.
>>>>>
>>>>> 1.Is doing
>>>>> Runtime.getRuntime().addShutdownHook(new Thread() {
>>>>>   @Override
>>>>>   public void run() {
>>>>>   jssc.stop(true, true);
>>>>>    System.out.println("Inside Add Shutdown Hook");
>>>>>   }
>>>>>  });
>>>>>
>>>>> to handle stop is safe ?
>>>>>
>>>>> 2.And I need to handle saving checkoinnt in shutdown hook also or
>>>>> driver will handle it automatically since it grcaefully stops stream and
>>>>> handle
>>>>> completion of foreachRDD function on stream ?
>>>>> directKafkaStream.foreachRDD(new Function<JavaRDD<byte[][]>, Void>() {
>>>>> }
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: stopping spark stream app

Posted by Shushant Arora <sh...@gmail.com>.
By RPC you mean web service exposed on driver which listens and set some
flag and driver checks that flag at end of each batch and if set then
gracefully stop the application ?

On Tue, Aug 11, 2015 at 12:43 AM, Tathagata Das <td...@databricks.com> wrote:

> In general, it is a little risky to put long running stuff in a shutdown
> hook as it may delay shutdown of the process which may delay other things.
> That said, you could try it out.
>
> A better way to explicitly shutdown gracefully is to use an RPC to signal
> the driver process to start shutting down, and then the process will
> gracefully stop the context and terminate. This is more robust that than
> leveraging shutdown hooks.
>
> On Mon, Aug 10, 2015 at 11:56 AM, Shushant Arora <
> shushantarora09@gmail.com> wrote:
>
>> Any help in best recommendation for gracefully shutting down a spark
>> stream application ?
>> I am running it on yarn and a way to tell from externally either yarn
>> application -kill command or some other way but need current batch to be
>> processed completely and checkpoint to be saved before shutting down.
>>
>> Runtime.getRuntime().addShutdownHook does not seem to be working. Yarn
>> kills the application immediately and dooes not call shutdown hook call
>> back .
>>
>>
>> On Sun, Aug 9, 2015 at 12:45 PM, Shushant Arora <
>> shushantarora09@gmail.com> wrote:
>>
>>> Hi
>>>
>>> How to ensure in spark streaming 1.3 with kafka that when an application
>>> is killed , last running batch is fully processed and offsets are written
>>> to checkpointing dir.
>>>
>>> On Fri, Aug 7, 2015 at 8:56 AM, Shushant Arora <
>>> shushantarora09@gmail.com> wrote:
>>>
>>>> Hi
>>>>
>>>> I am using spark stream 1.3 and using custom checkpoint to save kafka
>>>> offsets.
>>>>
>>>> 1.Is doing
>>>> Runtime.getRuntime().addShutdownHook(new Thread() {
>>>>   @Override
>>>>   public void run() {
>>>>   jssc.stop(true, true);
>>>>    System.out.println("Inside Add Shutdown Hook");
>>>>   }
>>>>  });
>>>>
>>>> to handle stop is safe ?
>>>>
>>>> 2.And I need to handle saving checkoinnt in shutdown hook also or
>>>> driver will handle it automatically since it grcaefully stops stream and
>>>> handle
>>>> completion of foreachRDD function on stream ?
>>>> directKafkaStream.foreachRDD(new Function<JavaRDD<byte[][]>, Void>() {
>>>> }
>>>>
>>>> Thanks
>>>>
>>>>
>>>
>>
>

Re: stopping spark stream app

Posted by Tathagata Das <td...@databricks.com>.
In general, it is a little risky to put long running stuff in a shutdown
hook as it may delay shutdown of the process which may delay other things.
That said, you could try it out.

A better way to explicitly shutdown gracefully is to use an RPC to signal
the driver process to start shutting down, and then the process will
gracefully stop the context and terminate. This is more robust that than
leveraging shutdown hooks.

On Mon, Aug 10, 2015 at 11:56 AM, Shushant Arora <sh...@gmail.com>
wrote:

> Any help in best recommendation for gracefully shutting down a spark
> stream application ?
> I am running it on yarn and a way to tell from externally either yarn
> application -kill command or some other way but need current batch to be
> processed completely and checkpoint to be saved before shutting down.
>
> Runtime.getRuntime().addShutdownHook does not seem to be working. Yarn
> kills the application immediately and dooes not call shutdown hook call
> back .
>
>
> On Sun, Aug 9, 2015 at 12:45 PM, Shushant Arora <shushantarora09@gmail.com
> > wrote:
>
>> Hi
>>
>> How to ensure in spark streaming 1.3 with kafka that when an application
>> is killed , last running batch is fully processed and offsets are written
>> to checkpointing dir.
>>
>> On Fri, Aug 7, 2015 at 8:56 AM, Shushant Arora <shushantarora09@gmail.com
>> > wrote:
>>
>>> Hi
>>>
>>> I am using spark stream 1.3 and using custom checkpoint to save kafka
>>> offsets.
>>>
>>> 1.Is doing
>>> Runtime.getRuntime().addShutdownHook(new Thread() {
>>>   @Override
>>>   public void run() {
>>>   jssc.stop(true, true);
>>>    System.out.println("Inside Add Shutdown Hook");
>>>   }
>>>  });
>>>
>>> to handle stop is safe ?
>>>
>>> 2.And I need to handle saving checkoinnt in shutdown hook also or driver
>>> will handle it automatically since it grcaefully stops stream and handle
>>> completion of foreachRDD function on stream ?
>>> directKafkaStream.foreachRDD(new Function<JavaRDD<byte[][]>, Void>() {
>>> }
>>>
>>> Thanks
>>>
>>>
>>
>

Re: stopping spark stream app

Posted by Shushant Arora <sh...@gmail.com>.
Any help in best recommendation for gracefully shutting down a spark stream
application ?
I am running it on yarn and a way to tell from externally either yarn
application -kill command or some other way but need current batch to be
processed completely and checkpoint to be saved before shutting down.

Runtime.getRuntime().addShutdownHook does not seem to be working. Yarn
kills the application immediately and dooes not call shutdown hook call
back .


On Sun, Aug 9, 2015 at 12:45 PM, Shushant Arora <sh...@gmail.com>
wrote:

> Hi
>
> How to ensure in spark streaming 1.3 with kafka that when an application
> is killed , last running batch is fully processed and offsets are written
> to checkpointing dir.
>
> On Fri, Aug 7, 2015 at 8:56 AM, Shushant Arora <sh...@gmail.com>
> wrote:
>
>> Hi
>>
>> I am using spark stream 1.3 and using custom checkpoint to save kafka
>> offsets.
>>
>> 1.Is doing
>> Runtime.getRuntime().addShutdownHook(new Thread() {
>>   @Override
>>   public void run() {
>>   jssc.stop(true, true);
>>    System.out.println("Inside Add Shutdown Hook");
>>   }
>>  });
>>
>> to handle stop is safe ?
>>
>> 2.And I need to handle saving checkoinnt in shutdown hook also or driver
>> will handle it automatically since it grcaefully stops stream and handle
>> completion of foreachRDD function on stream ?
>> directKafkaStream.foreachRDD(new Function<JavaRDD<byte[][]>, Void>() {
>> }
>>
>> Thanks
>>
>>
>

Re: stopping spark stream app

Posted by Shushant Arora <sh...@gmail.com>.
Hi

How to ensure in spark streaming 1.3 with kafka that when an application is
killed , last running batch is fully processed and offsets are written to
checkpointing dir.

On Fri, Aug 7, 2015 at 8:56 AM, Shushant Arora <sh...@gmail.com>
wrote:

> Hi
>
> I am using spark stream 1.3 and using custom checkpoint to save kafka
> offsets.
>
> 1.Is doing
> Runtime.getRuntime().addShutdownHook(new Thread() {
>   @Override
>   public void run() {
>   jssc.stop(true, true);
>    System.out.println("Inside Add Shutdown Hook");
>   }
>  });
>
> to handle stop is safe ?
>
> 2.And I need to handle saving checkoinnt in shutdown hook also or driver
> will handle it automatically since it grcaefully stops stream and handle
> completion of foreachRDD function on stream ?
> directKafkaStream.foreachRDD(new Function<JavaRDD<byte[][]>, Void>() {
> }
>
> Thanks
>
>