You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by M Singh <ma...@yahoo.com> on 2020/06/04 17:42:46 UTC

Stopping a job

Hi:
I am running a job which consumes data from Kinesis and send data to another Kinesis queue.  I am using an older version of Flink (1.6), and when I try to stop the job I get an exception 

 

Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Job termination (STOP) failed: This job is not stoppable.]


I wanted to find out what is a stoppable job and it possible to make a job stoppable if is reading/writing to kinesis ?
Thanks



Re: Stopping a job

Posted by Kostas Kloudas <kk...@gmail.com>.
Hi all,

Just for future reference, there is an ongoing discussion on the topic at
another thread found in [1].
So please post any relevant comments there :)

Cheers,
Kostas

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Age-old-stop-vs-cancel-debate-td35514.html#a35615

On Tue, Jun 9, 2020 at 7:36 AM M Singh <ma...@yahoo.com> wrote:

> Thanks Kostas, Arvid, and Senthil for your help.
>
> On Monday, June 8, 2020, 12:47:56 PM EDT, Senthil Kumar <
> senthilku@vmware.com> wrote:
>
>
> I am just stating this for completeness.
>
>
>
> When a job is cancelled, Flink sends an Interrupt signal to the Thread
> running the Source.run method
>
>
>
> For some reason (unknown to me), this does not happen when a Stop command
> is issued.
>
>
>
> We ran into some minor issues because of said behavior.
>
>
>
> *From: *Kostas Kloudas <kk...@gmail.com>
> *Date: *Monday, June 8, 2020 at 2:35 AM
> *To: *Arvid Heise <ar...@ververica.com>
> *Cc: *M Singh <ma...@yahoo.com>, User-Flink <us...@flink.apache.org>
> *Subject: *Re: Stopping a job
>
>
>
> What Arvid said is correct.
>
> The only thing I have to add is that "stop" allows also exactly-once sinks
> to push out their buffered data to their final destination (e.g.
> Filesystem). In other words, it takes into account side-effects, so it
> guarantees exactly-once end-to-end, assuming that you are
> using exactly-once sources and sinks. Cancel with savepoint on the other
> hand did not necessarily and committing side-effects is was following a
> "best-effort" approach.
>
>
>
> For more information you can check [1].
>
>
>
> Cheers,
>
> Kostas
>
>
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212
> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fpages%2Fviewpage.action%3FpageId%3D103090212&data=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560377282&sdata=AaA89A3Cq3iVoArqAS3BuvxGPFphztC4g7O6P96JxEs%3D&reserved=0>
>
>
>
> On Mon, Jun 8, 2020 at 10:23 AM Arvid Heise <ar...@ververica.com> wrote:
>
> It was before I joined the dev team, so the following are kind of
> speculative:
>
>
>
> The concept of stoppable functions never really took off as it was a bit
> of a clumsy approach. There is no fundamental difference between stopping
> and cancelling on (sub)task level. Indeed if you look in the twitter source
> of 1.6 [1], cancel() and stop() are doing the exact same thing. I'd assume
> that this is probably true for all sources.
>
>
>
> So what is the difference between cancel and stop then? It's more the way
> on how you terminate the whole DAG. On cancelling, you cancel() on all
> tasks more or less simultaneously. If you want to stop, it's more a
> fine-grain cancel, where you stop first the sources and then let the tasks
> close themselves when all upstream tasks are done. Just before closing the
> tasks, you also take a snapshot. Thus, the difference should not be visible
> in user code but only in the Flink code itself (task/checkpoint coordinator)
>
>
>
> So for your question:
>
> 1. No, as on task level stop() and cancel() are the same thing on UDF
> level.
>
> 2. Yes, stop will be more graceful and creates a snapshot. [2]
>
> 3. Not that I am aware of. In the whole flink code base, there are no more
> (see javadoc). You could of course check if there are some in Bahir. But it
> shouldn't really matter. There is no huge difference between stopping and
> cancelling if you wait for a checkpoint to finish.
>
> 4. Okay you answered your second question ;) Yes cancel with savepoint =
> stop now to make it easier for new users.
>
>
>
> [1]
> https://github.com/apache/flink/blob/release-1.6/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java#L180-L190
> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Frelease-1.6%2Fflink-connectors%2Fflink-connector-twitter%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fstreaming%2Fconnectors%2Ftwitter%2FTwitterSource.java%23L180-L190&data=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560387275&sdata=7xQ3BbQUveflErmTg34QsKvwOjlLnwS41xaoscjd57k%3D&reserved=0>
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/cli.html
> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.10%2Fops%2Fcli.html&data=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560387275&sdata=u89koNzR4Ho2%2FPzQWyeEvYPX60c9FbP3kBrrngHOKTA%3D&reserved=0>
>
>
>
> On Sun, Jun 7, 2020 at 1:04 AM M Singh <ma...@yahoo.com> wrote:
>
>
>
> Hi Arvid:
>
>
>
> Thanks for the links.
>
>
>
> A few questions:
>
>
>
> 1. Is there any particular interface in 1.9+ that identifies the source as
> stoppable ?
>
> 2. Is there any distinction b/w stop and cancel  in 1.9+ ?
>
> 3. Is there any list of sources which are documented as stoppable besides
> the one listed in your SO link ?
>
> 4. In 1.9+ there is flink stop command and a flink cancel command. (
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#stop
> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fcli.html%23stop&data=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560397270&sdata=8m7T%2BQRt0u7hSK2o44ilIJOTXUGZ8bqZ3%2BD7xpga6KQ%3D&reserved=0>).
> So it appears that flink stop will take a savepoint and the call cancel,
> and cancel will just cancel the job (looks like cancel with savepoint is
> deprecated in 1.10).
>
>
>
> Thanks again for your help.
>
>
>
>
>
>
>
> On Saturday, June 6, 2020, 02:18:57 PM EDT, Arvid Heise <
> arvid@ververica.com> wrote:
>
>
>
>
>
> Yes, it seems as if FlinkKinesisConsumer does not implement it.
>
>
>
> Here are the links to the respective javadoc [1] and code [2]. Note that
> in later releases (1.9+) this interface has been removed. Stop is now
> implemented through a cancel() on source level.
>
>
>
> In general, I don't think that in a Kinesis to Kinesis use case, stop is
> needed anyways, since there is no additional consistency expected over a
> normal cancel.
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/api/common/functions/StoppableFunction.html
> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.6%2Fapi%2Fjava%2Forg%2Fapache%2Fflink%2Fapi%2Fcommon%2Ffunctions%2FStoppableFunction.html&data=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560397270&sdata=gsNo3WtUL3j087zEPe31nIQeAtdtVGnmvEbl%2BDd9QsU%3D&reserved=0>
>
> [2]
> https://github.com/apache/flink/blob/release-1.6/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java
> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Frelease-1.6%2Fflink-core%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fapi%2Fcommon%2Ffunctions%2FStoppableFunction.java&data=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560407263&sdata=KF15g%2BV%2FkLMBkHgGcLTS0nbTqPfvAQVA8cRATWJCEmo%3D&reserved=0>
>
>
>
> On Sat, Jun 6, 2020 at 8:03 PM M Singh <ma...@yahoo.com> wrote:
>
> Hi Arvid:
>
>
>
> I check the link and it indicates that only Storm SpoutSource,
> TwitterSource and NifiSource support stop.
>
>
>
> Does this mean that FlinkKinesisConsumer is not stoppable ?
>
>
> Also, can you please point me to the Stoppable interface mentioned in the
> link ?  I found the following but am not sure if TwitterSource implements
> it :
>
>
> https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java
> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F8674b69964eae50cad024f2c5caf92a71bf21a09%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Frpc%2FStartStoppable.java&data=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560407263&sdata=7pkp%2FVgKMv2BzRIFtjqGahP8hDzPymrhZy%2FfiWu9I2U%3D&reserved=0>
>
>
>
> Thanks
>
>
>
>
>
>
>
>
>
>
>
> On Friday, June 5, 2020, 02:48:49 PM EDT, Arvid Heise <ar...@ververica.com>
> wrote:
>
>
>
>
>
> Hi,
>
>
>
> could you check if this SO thread [1] helps you already?
>
>
>
> [1]
> https://stackoverflow.com/questions/53735318/flink-how-to-solve-error-this-job-is-not-stoppable
> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F53735318%2Fflink-how-to-solve-error-this-job-is-not-stoppable&data=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560407263&sdata=Qm4jxs1%2Bnt7tSkVWpG5SQUzFL5yb%2FZTOkuTv0sl6oHU%3D&reserved=0>
>
>
>
> On Thu, Jun 4, 2020 at 7:43 PM M Singh <ma...@yahoo.com> wrote:
>
> Hi:
>
>
>
> I am running a job which consumes data from Kinesis and send data to
> another Kinesis queue.  I am using an older version of Flink (1.6), and
> when I try to stop the job I get an exception
>
>
>
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException: [Job termination
> (STOP) failed: This job is not stoppable.]
>
>
>
>
>
> I wanted to find out what is a stoppable job and it possible to make a job
> stoppable if is reading/writing to kinesis ?
>
>
>
> Thanks
>
>
>
>
>
> --
>
> *Arvid Heise *| Senior Java Developer
>
> [image: Image removed by sender.]
> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.ververica.com%2F&data=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560417259&sdata=z%2Bt8RTHjd0MREZo61CAI%2FuTMcEDHvUGVbXQ%2Fhd4gR1I%3D&reserved=0>
>
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward
> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fflink-forward.org%2F&data=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560417259&sdata=zqHj2T6VFBqxV%2BXpFrVJ1U7BBiiKhOp1K9P5Hr7Cct8%3D&reserved=0>
> - The Apache Flink Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>
>
>
> --
>
> *Arvid Heise *| Senior Java Developer
>
> [image: Image removed by sender.]
> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.ververica.com%2F&data=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560427251&sdata=o5SGodSS8l8HtYGw%2BHE4W%2BvMvwglbOzrqlzKOfgtDw0%3D&reserved=0>
>
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward
> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fflink-forward.org%2F&data=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560427251&sdata=SIsuHwPOgkNiJShFqRB7lik93jWaOFNqwfQPhawwuxY%3D&reserved=0>
> - The Apache Flink Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>
>
>
> --
>
> *Arvid Heise *| Senior Java Developer
>
> [image: Image removed by sender.]
> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.ververica.com%2F&data=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560427251&sdata=o5SGodSS8l8HtYGw%2BHE4W%2BvMvwglbOzrqlzKOfgtDw0%3D&reserved=0>
>
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward
> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fflink-forward.org%2F&data=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560437248&sdata=sO9zv39u85w3uriJggUXjjGsSpp0vpgSij8fG204JC0%3D&reserved=0>
> - The Apache Flink Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>
>

Re: Stopping a job

Posted by M Singh <ma...@yahoo.com>.
 Thanks Kostas, Arvid, and Senthil for your help.
    On Monday, June 8, 2020, 12:47:56 PM EDT, Senthil Kumar <se...@vmware.com> wrote:  
 
 #yiv1043440718 #yiv1043440718 -- _filtered {} _filtered {} _filtered {} _filtered {} _filtered {}#yiv1043440718 #yiv1043440718 p.yiv1043440718MsoNormal, #yiv1043440718 li.yiv1043440718MsoNormal, #yiv1043440718 div.yiv1043440718MsoNormal {margin:0in;margin-bottom:.0001pt;font-size:11.0pt;font-family:sans-serif;}#yiv1043440718 a:link, #yiv1043440718 span.yiv1043440718MsoHyperlink {color:blue;text-decoration:underline;}#yiv1043440718 a:visited, #yiv1043440718 span.yiv1043440718MsoHyperlinkFollowed {color:purple;text-decoration:underline;}#yiv1043440718 p.yiv1043440718msonormal0, #yiv1043440718 li.yiv1043440718msonormal0, #yiv1043440718 div.yiv1043440718msonormal0 {margin-right:0in;margin-left:0in;font-size:11.0pt;font-family:sans-serif;}#yiv1043440718 span.yiv1043440718EmailStyle19 {font-family:sans-serif;color:windowtext;}#yiv1043440718 .yiv1043440718MsoChpDefault {font-size:10.0pt;} _filtered {}#yiv1043440718 div.yiv1043440718WordSection1 {}#yiv1043440718 
I am just stating this for completeness.
 
  
 
When a job is cancelled, Flink sends an Interrupt signal to the Thread running the Source.run method
 
  
 
For some reason (unknown to me), this does not happen when a Stop command is issued.
 
  
 
We ran into some minor issues because of said behavior.
 
  
 
From: Kostas Kloudas <kk...@gmail.com>
Date: Monday, June 8, 2020 at 2:35 AM
To: Arvid Heise <ar...@ververica.com>
Cc: M Singh <ma...@yahoo.com>, User-Flink <us...@flink.apache.org>
Subject: Re: Stopping a job
 
  
 
What Arvid said is correct. 
 
The only thing I have to add is that "stop" allows also exactly-once sinks to push out their buffered data to their final destination (e.g. Filesystem). In other words, it takes into account side-effects, so it guarantees exactly-once end-to-end, assuming that you are using exactly-once sources and sinks. Cancel with savepoint on the other hand did not necessarily and committing side-effects is was following a "best-effort" approach.
 
  
 
For more information you can check [1].
 
  
 
Cheers,
 
Kostas 
 
  
 
[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212
 
  
 
On Mon, Jun 8, 2020 at 10:23 AM Arvid Heise <ar...@ververica.com> wrote:
 

It was before I joined the dev team, so the following are kind of speculative:
 
  
 
The concept of stoppable functions never really took off as it was a bit of a clumsy approach. There is no fundamental difference between stopping and cancelling on (sub)task level. Indeed if you look in the twitter source of 1.6 [1], cancel() and stop() are doing the exact same thing. I'd assume that this is probably true for all sources.
 
  
 
So what is the difference between cancel and stop then? It's more the way on how you terminate the whole DAG. On cancelling, you cancel() on all tasks more or less simultaneously. If you want to stop, it's more a fine-grain cancel, where you stop first the sources and then let the tasks close themselves when all upstream tasks are done. Just before closing the tasks, you also take a snapshot. Thus, the difference should not be visible in user code but only in the Flink code itself (task/checkpoint coordinator)
 
  
 
So for your question:
 
1. No, as on task level stop() and cancel() are the same thing on UDF level.
 
2. Yes, stop will be more graceful and creates a snapshot. [2] 
 
3. Not that I am aware of. In the whole flink code base, there are no more (see javadoc). You could of course check if there are some in Bahir. But it shouldn't really matter. There is no huge difference between stopping and cancelling if you wait for a checkpoint to finish. 
 
4. Okay you answered your second question ;) Yes cancel with savepoint = stop now to make it easier for new users.
 
  
 
[1] https://github.com/apache/flink/blob/release-1.6/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java#L180-L190
 
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/cli.html
 
  
 
On Sun, Jun 7, 2020 at 1:04 AM M Singh <ma...@yahoo.com> wrote:
 

  
 
Hi Arvid:   
 
  
 
Thanks for the links.  
 
  
 
A few questions:
 
  
 
1. Is there any particular interface in 1.9+ that identifies the source as stoppable ?
 
2. Is there any distinction b/w stop and cancel  in 1.9+ ?
 
3. Is there any list of sources which are documented as stoppable besides the one listed in your SO link ?
 
4. In 1.9+ there is flink stop command and a flink cancel command. (https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#stop).  So it appears that flink stop will take a savepoint and the call cancel, and cancel will just cancel the job (looks like cancel with savepoint is deprecated in 1.10).  
 
  
 
Thanks again for your help.
 
  
 
  
 
  
 
On Saturday, June 6, 2020, 02:18:57 PM EDT, Arvid Heise <ar...@ververica.com> wrote:
 
  
 
  
 
Yes, it seems as if FlinkKinesisConsumer does not implement it.
 
  
 
Here are the links to the respective javadoc [1] and code [2]. Note that in later releases (1.9+) this interface has been removed. Stop is now implemented through a cancel() on source level.
 
  
 
In general, I don't think that in a Kinesis to Kinesis use case, stop is needed anyways, since there is no additional consistency expected over a normal cancel.
 
  
 
[1]https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/api/common/functions/StoppableFunction.html
 
[2]https://github.com/apache/flink/blob/release-1.6/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java
 
  
 
On Sat, Jun 6, 2020 at 8:03 PM M Singh <ma...@yahoo.com> wrote:
 

Hi Arvid:
 
  
 
I check the link and it indicates that only Storm SpoutSource, TwitterSource and NifiSource support stop.   
 
  
 
Does this mean that FlinkKinesisConsumer is not stoppable ?
 

Also, can you please point me to the Stoppable interface mentioned in the link ?  I found the following but am not sure if TwitterSource implements it :
 
https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java
 
  
 
Thanks
 
  
 
  
 
  
 
  
 
  
 
On Friday, June 5, 2020, 02:48:49 PM EDT, Arvid Heise <ar...@ververica.com> wrote:
 
  
 
  
 
Hi,
 
  
 
could you check if this SO thread [1] helps you already?
 
  
 
[1]https://stackoverflow.com/questions/53735318/flink-how-to-solve-error-this-job-is-not-stoppable
 
  
 
On Thu, Jun 4, 2020 at 7:43 PM M Singh <ma...@yahoo.com> wrote:
 

Hi:
 
  
 
I am running a job which consumes data from Kinesis and send data to another Kinesis queue.  I am using an older version of Flink (1.6), and when I try to stop the job I get an exception 
 
  
 
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Job termination (STOP) failed: This job is not stoppable.]
 
  
 
  
 
I wanted to find out what is a stoppable job and it possible to make a job stoppable if is reading/writing to kinesis ?
 
  
 
Thanks
 
  
 



-- 
 
Arvid Heise| Senior Java Developer
 

 
  
 
Follow us @VervericaData
 
--
 
JoinFlink Forward - The Apache Flink Conference
 
Stream Processing | Event Driven | Real Time
 
--
 
VervericaGmbH | Invalidenstrasse 115, 10115 Berlin, Germany
 
--
 
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
 



-- 
 
Arvid Heise| Senior Java Developer
 

 
  
 
Follow us @VervericaData
 
--
 
JoinFlink Forward - The Apache Flink Conference
 
Stream Processing | Event Driven | Real Time
 
--
 
VervericaGmbH | Invalidenstrasse 115, 10115 Berlin, Germany
 
--
 
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
 



-- 
 
Arvid Heise| Senior Java Developer
 

 
  
 
Follow us @VervericaData
 
--
 
JoinFlink Forward - The Apache Flink Conference
 
Stream Processing | Event Driven | Real Time
 
--
 
VervericaGmbH | Invalidenstrasse 115, 10115 Berlin, Germany
 
--
 
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
 
  

Re: Stopping a job

Posted by Senthil Kumar <se...@vmware.com>.
I am just stating this for completeness.

When a job is cancelled, Flink sends an Interrupt signal to the Thread running the Source.run method

For some reason (unknown to me), this does not happen when a Stop command is issued.

We ran into some minor issues because of said behavior.

From: Kostas Kloudas <kk...@gmail.com>
Date: Monday, June 8, 2020 at 2:35 AM
To: Arvid Heise <ar...@ververica.com>
Cc: M Singh <ma...@yahoo.com>, User-Flink <us...@flink.apache.org>
Subject: Re: Stopping a job

What Arvid said is correct.
The only thing I have to add is that "stop" allows also exactly-once sinks to push out their buffered data to their final destination (e.g. Filesystem). In other words, it takes into account side-effects, so it guarantees exactly-once end-to-end, assuming that you are using exactly-once sources and sinks. Cancel with savepoint on the other hand did not necessarily and committing side-effects is was following a "best-effort" approach.

For more information you can check [1].

Cheers,
Kostas

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fpages%2Fviewpage.action%3FpageId%3D103090212&data=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560377282&sdata=AaA89A3Cq3iVoArqAS3BuvxGPFphztC4g7O6P96JxEs%3D&reserved=0>

On Mon, Jun 8, 2020 at 10:23 AM Arvid Heise <ar...@ververica.com>> wrote:
It was before I joined the dev team, so the following are kind of speculative:

The concept of stoppable functions never really took off as it was a bit of a clumsy approach. There is no fundamental difference between stopping and cancelling on (sub)task level. Indeed if you look in the twitter source of 1.6 [1], cancel() and stop() are doing the exact same thing. I'd assume that this is probably true for all sources.

So what is the difference between cancel and stop then? It's more the way on how you terminate the whole DAG. On cancelling, you cancel() on all tasks more or less simultaneously. If you want to stop, it's more a fine-grain cancel, where you stop first the sources and then let the tasks close themselves when all upstream tasks are done. Just before closing the tasks, you also take a snapshot. Thus, the difference should not be visible in user code but only in the Flink code itself (task/checkpoint coordinator)

So for your question:
1. No, as on task level stop() and cancel() are the same thing on UDF level.
2. Yes, stop will be more graceful and creates a snapshot. [2]
3. Not that I am aware of. In the whole flink code base, there are no more (see javadoc). You could of course check if there are some in Bahir. But it shouldn't really matter. There is no huge difference between stopping and cancelling if you wait for a checkpoint to finish.
4. Okay you answered your second question ;) Yes cancel with savepoint = stop now to make it easier for new users.

[1] https://github.com/apache/flink/blob/release-1.6/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java#L180-L190<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Frelease-1.6%2Fflink-connectors%2Fflink-connector-twitter%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fstreaming%2Fconnectors%2Ftwitter%2FTwitterSource.java%23L180-L190&data=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560387275&sdata=7xQ3BbQUveflErmTg34QsKvwOjlLnwS41xaoscjd57k%3D&reserved=0>
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/cli.html<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.10%2Fops%2Fcli.html&data=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560387275&sdata=u89koNzR4Ho2%2FPzQWyeEvYPX60c9FbP3kBrrngHOKTA%3D&reserved=0>

On Sun, Jun 7, 2020 at 1:04 AM M Singh <ma...@yahoo.com>> wrote:

Hi Arvid:

Thanks for the links.

A few questions:

1. Is there any particular interface in 1.9+ that identifies the source as stoppable ?
2. Is there any distinction b/w stop and cancel  in 1.9+ ?
3. Is there any list of sources which are documented as stoppable besides the one listed in your SO link ?
4. In 1.9+ there is flink stop command and a flink cancel command. (https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#stop<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fcli.html%23stop&data=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560397270&sdata=8m7T%2BQRt0u7hSK2o44ilIJOTXUGZ8bqZ3%2BD7xpga6KQ%3D&reserved=0>).  So it appears that flink stop will take a savepoint and the call cancel, and cancel will just cancel the job (looks like cancel with savepoint is deprecated in 1.10).

Thanks again for your help.



On Saturday, June 6, 2020, 02:18:57 PM EDT, Arvid Heise <ar...@ververica.com>> wrote:


Yes, it seems as if FlinkKinesisConsumer does not implement it.

Here are the links to the respective javadoc [1] and code [2]. Note that in later releases (1.9+) this interface has been removed. Stop is now implemented through a cancel() on source level.

In general, I don't think that in a Kinesis to Kinesis use case, stop is needed anyways, since there is no additional consistency expected over a normal cancel.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/api/common/functions/StoppableFunction.html<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.6%2Fapi%2Fjava%2Forg%2Fapache%2Fflink%2Fapi%2Fcommon%2Ffunctions%2FStoppableFunction.html&data=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560397270&sdata=gsNo3WtUL3j087zEPe31nIQeAtdtVGnmvEbl%2BDd9QsU%3D&reserved=0>
[2] https://github.com/apache/flink/blob/release-1.6/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Frelease-1.6%2Fflink-core%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fapi%2Fcommon%2Ffunctions%2FStoppableFunction.java&data=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560407263&sdata=KF15g%2BV%2FkLMBkHgGcLTS0nbTqPfvAQVA8cRATWJCEmo%3D&reserved=0>

On Sat, Jun 6, 2020 at 8:03 PM M Singh <ma...@yahoo.com>> wrote:
Hi Arvid:

I check the link and it indicates that only Storm SpoutSource, TwitterSource and NifiSource support stop.

Does this mean that FlinkKinesisConsumer is not stoppable ?

Also, can you please point me to the Stoppable interface mentioned in the link ?  I found the following but am not sure if TwitterSource implements it :
https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F8674b69964eae50cad024f2c5caf92a71bf21a09%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Frpc%2FStartStoppable.java&data=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560407263&sdata=7pkp%2FVgKMv2BzRIFtjqGahP8hDzPymrhZy%2FfiWu9I2U%3D&reserved=0>

Thanks





On Friday, June 5, 2020, 02:48:49 PM EDT, Arvid Heise <ar...@ververica.com>> wrote:


Hi,

could you check if this SO thread [1] helps you already?

[1] https://stackoverflow.com/questions/53735318/flink-how-to-solve-error-this-job-is-not-stoppable<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F53735318%2Fflink-how-to-solve-error-this-job-is-not-stoppable&data=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560407263&sdata=Qm4jxs1%2Bnt7tSkVWpG5SQUzFL5yb%2FZTOkuTv0sl6oHU%3D&reserved=0>

On Thu, Jun 4, 2020 at 7:43 PM M Singh <ma...@yahoo.com>> wrote:
Hi:

I am running a job which consumes data from Kinesis and send data to another Kinesis queue.  I am using an older version of Flink (1.6), and when I try to stop the job I get an exception


Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Job termination (STOP) failed: This job is not stoppable.]


I wanted to find out what is a stoppable job and it possible to make a job stoppable if is reading/writing to kinesis ?

Thanks



--

Arvid Heise | Senior Java Developer

[Image removed by sender.]<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.ververica.com%2F&data=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560417259&sdata=z%2Bt8RTHjd0MREZo61CAI%2FuTMcEDHvUGVbXQ%2Fhd4gR1I%3D&reserved=0>


Follow us @VervericaData

--

Join Flink Forward<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fflink-forward.org%2F&data=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560417259&sdata=zqHj2T6VFBqxV%2BXpFrVJ1U7BBiiKhOp1K9P5Hr7Cct8%3D&reserved=0> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng


--

Arvid Heise | Senior Java Developer

[Image removed by sender.]<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.ververica.com%2F&data=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560427251&sdata=o5SGodSS8l8HtYGw%2BHE4W%2BvMvwglbOzrqlzKOfgtDw0%3D&reserved=0>


Follow us @VervericaData

--

Join Flink Forward<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fflink-forward.org%2F&data=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560427251&sdata=SIsuHwPOgkNiJShFqRB7lik93jWaOFNqwfQPhawwuxY%3D&reserved=0> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng


--

Arvid Heise | Senior Java Developer

[Image removed by sender.]<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.ververica.com%2F&data=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560427251&sdata=o5SGodSS8l8HtYGw%2BHE4W%2BvMvwglbOzrqlzKOfgtDw0%3D&reserved=0>


Follow us @VervericaData

--

Join Flink Forward<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fflink-forward.org%2F&data=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560437248&sdata=sO9zv39u85w3uriJggUXjjGsSpp0vpgSij8fG204JC0%3D&reserved=0> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng

Re: Stopping a job

Posted by Kostas Kloudas <kk...@gmail.com>.
What Arvid said is correct.
The only thing I have to add is that "stop" allows also exactly-once sinks
to push out their buffered data to their final destination (e.g.
Filesystem). In other words, it takes into account side-effects, so it
guarantees exactly-once end-to-end, assuming that you are
using exactly-once sources and sinks. Cancel with savepoint on the other
hand did not necessarily and committing side-effects is was following a
"best-effort" approach.

For more information you can check [1].

Cheers,
Kostas

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212

On Mon, Jun 8, 2020 at 10:23 AM Arvid Heise <ar...@ververica.com> wrote:

> It was before I joined the dev team, so the following are kind of
> speculative:
>
> The concept of stoppable functions never really took off as it was a bit
> of a clumsy approach. There is no fundamental difference between stopping
> and cancelling on (sub)task level. Indeed if you look in the twitter source
> of 1.6 [1], cancel() and stop() are doing the exact same thing. I'd assume
> that this is probably true for all sources.
>
> So what is the difference between cancel and stop then? It's more the way
> on how you terminate the whole DAG. On cancelling, you cancel() on all
> tasks more or less simultaneously. If you want to stop, it's more a
> fine-grain cancel, where you stop first the sources and then let the tasks
> close themselves when all upstream tasks are done. Just before closing the
> tasks, you also take a snapshot. Thus, the difference should not be visible
> in user code but only in the Flink code itself (task/checkpoint coordinator)
>
> So for your question:
> 1. No, as on task level stop() and cancel() are the same thing on UDF
> level.
> 2. Yes, stop will be more graceful and creates a snapshot. [2]
> 3. Not that I am aware of. In the whole flink code base, there are no more
> (see javadoc). You could of course check if there are some in Bahir. But it
> shouldn't really matter. There is no huge difference between stopping and
> cancelling if you wait for a checkpoint to finish.
> 4. Okay you answered your second question ;) Yes cancel with savepoint =
> stop now to make it easier for new users.
>
> [1]
> https://github.com/apache/flink/blob/release-1.6/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java#L180-L190
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/cli.html
>
> On Sun, Jun 7, 2020 at 1:04 AM M Singh <ma...@yahoo.com> wrote:
>
>>
>> Hi Arvid:
>>
>> Thanks for the links.
>>
>> A few questions:
>>
>> 1. Is there any particular interface in 1.9+ that identifies the source
>> as stoppable ?
>> 2. Is there any distinction b/w stop and cancel  in 1.9+ ?
>> 3. Is there any list of sources which are documented as stoppable besides
>> the one listed in your SO link ?
>> 4. In 1.9+ there is flink stop command and a flink cancel command. (
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#stop).
>> So it appears that flink stop will take a savepoint and the call cancel,
>> and cancel will just cancel the job (looks like cancel with savepoint is
>> deprecated in 1.10).
>>
>> Thanks again for your help.
>>
>>
>>
>> On Saturday, June 6, 2020, 02:18:57 PM EDT, Arvid Heise <
>> arvid@ververica.com> wrote:
>>
>>
>> Yes, it seems as if FlinkKinesisConsumer does not implement it.
>>
>> Here are the links to the respective javadoc [1] and code [2]. Note that
>> in later releases (1.9+) this interface has been removed. Stop is now
>> implemented through a cancel() on source level.
>>
>> In general, I don't think that in a Kinesis to Kinesis use case, stop is
>> needed anyways, since there is no additional consistency expected over a
>> normal cancel.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/api/common/functions/StoppableFunction.html
>> [2]
>> https://github.com/apache/flink/blob/release-1.6/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java
>>
>> On Sat, Jun 6, 2020 at 8:03 PM M Singh <ma...@yahoo.com> wrote:
>>
>> Hi Arvid:
>>
>> I check the link and it indicates that only Storm SpoutSource,
>> TwitterSource and NifiSource support stop.
>>
>> Does this mean that FlinkKinesisConsumer is not stoppable ?
>>
>> Also, can you please point me to the Stoppable interface mentioned in the
>> link ?  I found the following but am not sure if TwitterSource implements
>> it :
>>
>> https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java
>>
>> Thanks
>>
>>
>>
>>
>>
>> On Friday, June 5, 2020, 02:48:49 PM EDT, Arvid Heise <
>> arvid@ververica.com> wrote:
>>
>>
>> Hi,
>>
>> could you check if this SO thread [1] helps you already?
>>
>> [1]
>> https://stackoverflow.com/questions/53735318/flink-how-to-solve-error-this-job-is-not-stoppable
>>
>> On Thu, Jun 4, 2020 at 7:43 PM M Singh <ma...@yahoo.com> wrote:
>>
>> Hi:
>>
>> I am running a job which consumes data from Kinesis and send data to
>> another Kinesis queue.  I am using an older version of Flink (1.6), and
>> when I try to stop the job I get an exception
>>
>> Caused by: java.util.concurrent.ExecutionException:
>> org.apache.flink.runtime.rest.util.RestClientException: [Job termination
>> (STOP) failed: This job is not stoppable.]
>>
>>
>> I wanted to find out what is a stoppable job and it possible to make a
>> job stoppable if is reading/writing to kinesis ?
>>
>> Thanks
>>
>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Re: Stopping a job

Posted by Arvid Heise <ar...@ververica.com>.
It was before I joined the dev team, so the following are kind of
speculative:

The concept of stoppable functions never really took off as it was a bit of
a clumsy approach. There is no fundamental difference between stopping and
cancelling on (sub)task level. Indeed if you look in the twitter source of
1.6 [1], cancel() and stop() are doing the exact same thing. I'd assume
that this is probably true for all sources.

So what is the difference between cancel and stop then? It's more the way
on how you terminate the whole DAG. On cancelling, you cancel() on all
tasks more or less simultaneously. If you want to stop, it's more a
fine-grain cancel, where you stop first the sources and then let the tasks
close themselves when all upstream tasks are done. Just before closing the
tasks, you also take a snapshot. Thus, the difference should not be visible
in user code but only in the Flink code itself (task/checkpoint coordinator)

So for your question:
1. No, as on task level stop() and cancel() are the same thing on UDF level.
2. Yes, stop will be more graceful and creates a snapshot. [2]
3. Not that I am aware of. In the whole flink code base, there are no more
(see javadoc). You could of course check if there are some in Bahir. But it
shouldn't really matter. There is no huge difference between stopping and
cancelling if you wait for a checkpoint to finish.
4. Okay you answered your second question ;) Yes cancel with savepoint =
stop now to make it easier for new users.

[1]
https://github.com/apache/flink/blob/release-1.6/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java#L180-L190
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/cli.html

On Sun, Jun 7, 2020 at 1:04 AM M Singh <ma...@yahoo.com> wrote:

>
> Hi Arvid:
>
> Thanks for the links.
>
> A few questions:
>
> 1. Is there any particular interface in 1.9+ that identifies the source as
> stoppable ?
> 2. Is there any distinction b/w stop and cancel  in 1.9+ ?
> 3. Is there any list of sources which are documented as stoppable besides
> the one listed in your SO link ?
> 4. In 1.9+ there is flink stop command and a flink cancel command. (
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#stop).
> So it appears that flink stop will take a savepoint and the call cancel,
> and cancel will just cancel the job (looks like cancel with savepoint is
> deprecated in 1.10).
>
> Thanks again for your help.
>
>
>
> On Saturday, June 6, 2020, 02:18:57 PM EDT, Arvid Heise <
> arvid@ververica.com> wrote:
>
>
> Yes, it seems as if FlinkKinesisConsumer does not implement it.
>
> Here are the links to the respective javadoc [1] and code [2]. Note that
> in later releases (1.9+) this interface has been removed. Stop is now
> implemented through a cancel() on source level.
>
> In general, I don't think that in a Kinesis to Kinesis use case, stop is
> needed anyways, since there is no additional consistency expected over a
> normal cancel.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/api/common/functions/StoppableFunction.html
> [2]
> https://github.com/apache/flink/blob/release-1.6/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java
>
> On Sat, Jun 6, 2020 at 8:03 PM M Singh <ma...@yahoo.com> wrote:
>
> Hi Arvid:
>
> I check the link and it indicates that only Storm SpoutSource,
> TwitterSource and NifiSource support stop.
>
> Does this mean that FlinkKinesisConsumer is not stoppable ?
>
> Also, can you please point me to the Stoppable interface mentioned in the
> link ?  I found the following but am not sure if TwitterSource implements
> it :
>
> https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java
>
> Thanks
>
>
>
>
>
> On Friday, June 5, 2020, 02:48:49 PM EDT, Arvid Heise <ar...@ververica.com>
> wrote:
>
>
> Hi,
>
> could you check if this SO thread [1] helps you already?
>
> [1]
> https://stackoverflow.com/questions/53735318/flink-how-to-solve-error-this-job-is-not-stoppable
>
> On Thu, Jun 4, 2020 at 7:43 PM M Singh <ma...@yahoo.com> wrote:
>
> Hi:
>
> I am running a job which consumes data from Kinesis and send data to
> another Kinesis queue.  I am using an older version of Flink (1.6), and
> when I try to stop the job I get an exception
>
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException: [Job termination
> (STOP) failed: This job is not stoppable.]
>
>
> I wanted to find out what is a stoppable job and it possible to make a job
> stoppable if is reading/writing to kinesis ?
>
> Thanks
>
>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Stopping a job

Posted by M Singh <ma...@yahoo.com>.
 
Hi Arvid:   
Thanks for the links.  
A few questions:
1. Is there any particular interface in 1.9+ that identifies the source as stoppable ?2. Is there any distinction b/w stop and cancel  in 1.9+ ?3. Is there any list of sources which are documented as stoppable besides the one listed in your SO link ?4. In 1.9+ there is flink stop command and a flink cancel command. (https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#stop).  So it appears that flink stop will take a savepoint and the call cancel, and cancel will just cancel the job (looks like cancel with savepoint is deprecated in 1.10).  
Thanks again for your help.


    On Saturday, June 6, 2020, 02:18:57 PM EDT, Arvid Heise <ar...@ververica.com> wrote:  
 
 Yes, it seems as if FlinkKinesisConsumer does not implement it.
Here are the links to the respective javadoc [1] and code [2]. Note that in later releases (1.9+) this interface has been removed. Stop is now implemented through a cancel() on source level.
In general, I don't think that in a Kinesis to Kinesis use case, stop is needed anyways, since there is no additional consistency expected over a normal cancel.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/api/common/functions/StoppableFunction.html[2] https://github.com/apache/flink/blob/release-1.6/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java
On Sat, Jun 6, 2020 at 8:03 PM M Singh <ma...@yahoo.com> wrote:

 Hi Arvid:
I check the link and it indicates that only Storm SpoutSource, TwitterSource and NifiSource support stop.   
Does this mean that FlinkKinesisConsumer is not stoppable ?
Also, can you please point me to the Stoppable interface mentioned in the link ?  I found the following but am not sure if TwitterSource implements it :https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java
Thanks




    On Friday, June 5, 2020, 02:48:49 PM EDT, Arvid Heise <ar...@ververica.com> wrote:  
 
 Hi,
could you check if this SO thread [1] helps you already?
[1] https://stackoverflow.com/questions/53735318/flink-how-to-solve-error-this-job-is-not-stoppable
On Thu, Jun 4, 2020 at 7:43 PM M Singh <ma...@yahoo.com> wrote:

Hi:
I am running a job which consumes data from Kinesis and send data to another Kinesis queue.  I am using an older version of Flink (1.6), and when I try to stop the job I get an exception 

 

Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Job termination (STOP) failed: This job is not stoppable.]


I wanted to find out what is a stoppable job and it possible to make a job stoppable if is reading/writing to kinesis ?
Thanks





-- 

Arvid Heise | Senior Java Developer




Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng      


-- 

Arvid Heise | Senior Java Developer




Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng      

Re: Stopping a job

Posted by Arvid Heise <ar...@ververica.com>.
Yes, it seems as if FlinkKinesisConsumer does not implement it.

Here are the links to the respective javadoc [1] and code [2]. Note that in
later releases (1.9+) this interface has been removed. Stop is now
implemented through a cancel() on source level.

In general, I don't think that in a Kinesis to Kinesis use case, stop is
needed anyways, since there is no additional consistency expected over a
normal cancel.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/api/common/functions/StoppableFunction.html
[2]
https://github.com/apache/flink/blob/release-1.6/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java

On Sat, Jun 6, 2020 at 8:03 PM M Singh <ma...@yahoo.com> wrote:

> Hi Arvid:
>
> I check the link and it indicates that only Storm SpoutSource,
> TwitterSource and NifiSource support stop.
>
> Does this mean that FlinkKinesisConsumer is not stoppable ?
>
> Also, can you please point me to the Stoppable interface mentioned in the
> link ?  I found the following but am not sure if TwitterSource implements
> it :
>
> https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java
>
> Thanks
>
>
>
>
>
> On Friday, June 5, 2020, 02:48:49 PM EDT, Arvid Heise <ar...@ververica.com>
> wrote:
>
>
> Hi,
>
> could you check if this SO thread [1] helps you already?
>
> [1]
> https://stackoverflow.com/questions/53735318/flink-how-to-solve-error-this-job-is-not-stoppable
>
> On Thu, Jun 4, 2020 at 7:43 PM M Singh <ma...@yahoo.com> wrote:
>
> Hi:
>
> I am running a job which consumes data from Kinesis and send data to
> another Kinesis queue.  I am using an older version of Flink (1.6), and
> when I try to stop the job I get an exception
>
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException: [Job termination
> (STOP) failed: This job is not stoppable.]
>
>
> I wanted to find out what is a stoppable job and it possible to make a job
> stoppable if is reading/writing to kinesis ?
>
> Thanks
>
>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Stopping a job

Posted by M Singh <ma...@yahoo.com>.
 Hi Arvid:
I check the link and it indicates that only Storm SpoutSource, TwitterSource and NifiSource support stop.   
Does this mean that FlinkKinesisConsumer is not stoppable ?
Also, can you please point me to the Stoppable interface mentioned in the link ?  I found the following but am not sure if TwitterSource implements it :https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java
Thanks




    On Friday, June 5, 2020, 02:48:49 PM EDT, Arvid Heise <ar...@ververica.com> wrote:  
 
 Hi,
could you check if this SO thread [1] helps you already?
[1] https://stackoverflow.com/questions/53735318/flink-how-to-solve-error-this-job-is-not-stoppable
On Thu, Jun 4, 2020 at 7:43 PM M Singh <ma...@yahoo.com> wrote:

Hi:
I am running a job which consumes data from Kinesis and send data to another Kinesis queue.  I am using an older version of Flink (1.6), and when I try to stop the job I get an exception 

 

Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Job termination (STOP) failed: This job is not stoppable.]


I wanted to find out what is a stoppable job and it possible to make a job stoppable if is reading/writing to kinesis ?
Thanks





-- 

Arvid Heise | Senior Java Developer




Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng      

Re: Stopping a job

Posted by Arvid Heise <ar...@ververica.com>.
Hi,

could you check if this SO thread [1] helps you already?

[1]
https://stackoverflow.com/questions/53735318/flink-how-to-solve-error-this-job-is-not-stoppable

On Thu, Jun 4, 2020 at 7:43 PM M Singh <ma...@yahoo.com> wrote:

> Hi:
>
> I am running a job which consumes data from Kinesis and send data to
> another Kinesis queue.  I am using an older version of Flink (1.6), and
> when I try to stop the job I get an exception
>
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException: [Job termination
> (STOP) failed: This job is not stoppable.]
>
>
> I wanted to find out what is a stoppable job and it possible to make a job
> stoppable if is reading/writing to kinesis ?
>
> Thanks
>
>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng