You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Elias Levy <fe...@gmail.com> on 2017/09/12 01:17:03 UTC

Flink flick cancel vs stop

I was wondering about the status of the flink stop command.  At first blush
it would seem as the preferable way to shutdown a Flink job, but it depends
on StoppableFunction being implemented by sources and I notice that the
Kafka source does not seem to implement it.  In addition, the command does
not -s  --withSavepoint like cancel does.

Is stop deprecated?

Re: Flink flick cancel vs stop

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

Yes we are aware of this issue and we would like to have it soon, but at the moment it does not look like clean shutdown will be ready for Flink 1.5.

Another solution is Kafka exactly-once producer implemented on top of the GenericWriteAheadSink. It could avoid this issue (at a cost of significantly higher overhead). There are plans to implement such producer as an alternative to the current one, but I do not know the timeline for that. It should be relatively easy task and we would welcome such contribution. 

Piotrek

> On 14 Dec 2017, at 01:43, Elias Levy <fe...@gmail.com> wrote:
> 
> I am re-upping this thread now that FlinkKafkaProducer011 is out.  The new producer, when used with the exactly once semantics, has the rather troublesome behavior that it will fallback to at-most-once, rather than at-least-once, if the job is down for longer than the Kafka broker's transaction.max.timeout.ms <http://transaction.max.timeout.ms/> setting.
> 
> In situations that require extended maintenance downtime, this behavior is nearly certain to lead to message loss, as a canceling a job while taking a savepoint will not wait for the Kafka transactions to bet committed and is not atomic.
> 
> So it seems like there is a need for an atomic stop or cancel with savepoint that waits for transactional sinks to commit and then immediately stop any further message processing.
>  
> 
> On Tue, Oct 24, 2017 at 4:46 AM, Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> wrote:
> I would propose implementations of NewSource to be not blocking/asynchronous. For example something like
> 
> public abstract Future<T> getCurrent();
> 
> Which would allow us to perform some certain actions while there are no data available to process (for example flush output buffers). Something like this came up recently when we were discussing possible future changes in the network stack. It wouldn’t complicate API by a lot, since default implementation could just:
> 
> public Future<T> getCurrent() {
>   return completedFuture(getCurrentBlocking());
> }
> 
> Another thing to consider is maybe we would like to leave the door open for fetching records in some batches from the source’s input buffers? Source function (like Kafka) have some internal buffers and it would be more efficient to read all/deserialise all data present in the input buffer at once, instead of paying synchronisation/calling virtual method/etc costs once per each record.
> 
> Piotrek
> 
>> On 22 Sep 2017, at 11:13, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
>> 
>> @Eron Yes, that would be the difference in characterisation. I think technically all sources could be transformed by that by pushing data into a (blocking) queue and having the "getElement()" method pull from that.
>> 
>>> On 15. Sep 2017, at 20:17, Elias Levy <fearsome.lucidity@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> On Fri, Sep 15, 2017 at 10:02 AM, Eron Wright <eronwright@gmail.com <ma...@gmail.com>> wrote:
>>> Aljoscha, would it be correct to characterize your idea as a 'pull' source rather than the current 'push'?  It would be interesting to look at the existing connectors to see how hard it would be to reverse their orientation.   e.g. the source might require a buffer pool.
>>> 
>>> The Kafka client works that way.  As does the QueueingConsumer used by the RabbitMQ source.  The Kinesis and NiFi sources also seems to poll. Those are all the bundled sources.
>> 
> 
> 


Re: Flink flick cancel vs stop

Posted by Elias Levy <fe...@gmail.com>.
I am re-upping this thread now that FlinkKafkaProducer011 is out.  The new
producer, when used with the exactly once semantics, has the rather
troublesome behavior that it will fallback to at-most-once, rather than
at-least-once, if the job is down for longer than the Kafka broker's
transaction.max.timeout.ms setting.

In situations that require extended maintenance downtime, this behavior is
nearly certain to lead to message loss, as a canceling a job while taking a
savepoint will not wait for the Kafka transactions to bet committed and is
not atomic.

So it seems like there is a need for an atomic stop or cancel with
savepoint that waits for transactional sinks to commit and then immediately
stop any further message processing.


On Tue, Oct 24, 2017 at 4:46 AM, Piotr Nowojski <pi...@data-artisans.com>
wrote:

> I would propose implementations of NewSource to be not
> blocking/asynchronous. For example something like
>
> public abstract Future<T> getCurrent();
>
> Which would allow us to perform some certain actions while there are no
> data available to process (for example flush output buffers). Something
> like this came up recently when we were discussing possible future changes
> in the network stack. It wouldn’t complicate API by a lot, since default
> implementation could just:
>
> public Future<T> getCurrent() {
>   return completedFuture(getCurrentBlocking());
> }
>
> Another thing to consider is maybe we would like to leave the door open
> for fetching records in some batches from the source’s input buffers?
> Source function (like Kafka) have some internal buffers and it would be
> more efficient to read all/deserialise all data present in the input buffer
> at once, instead of paying synchronisation/calling virtual method/etc costs
> once per each record.
>
> Piotrek
>
> On 22 Sep 2017, at 11:13, Aljoscha Krettek <al...@apache.org> wrote:
>
> @Eron Yes, that would be the difference in characterisation. I think
> technically all sources could be transformed by that by pushing data into a
> (blocking) queue and having the "getElement()" method pull from that.
>
> On 15. Sep 2017, at 20:17, Elias Levy <fe...@gmail.com> wrote:
>
> On Fri, Sep 15, 2017 at 10:02 AM, Eron Wright <er...@gmail.com>
> wrote:
>
>> Aljoscha, would it be correct to characterize your idea as a 'pull'
>> source rather than the current 'push'?  It would be interesting to look at
>> the existing connectors to see how hard it would be to reverse their
>> orientation.   e.g. the source might require a buffer pool.
>>
>
> The Kafka client works that way.  As does the QueueingConsumer used by the
> RabbitMQ source.  The Kinesis and NiFi sources also seems to poll. Those
> are all the bundled sources.
>
>
>
>

Re: Flink flick cancel vs stop

Posted by Piotr Nowojski <pi...@data-artisans.com>.
I would propose implementations of NewSource to be not blocking/asynchronous. For example something like

public abstract Future<T> getCurrent();

Which would allow us to perform some certain actions while there are no data available to process (for example flush output buffers). Something like this came up recently when we were discussing possible future changes in the network stack. It wouldn’t complicate API by a lot, since default implementation could just:

public Future<T> getCurrent() {
  return completedFuture(getCurrentBlocking());
}

Another thing to consider is maybe we would like to leave the door open for fetching records in some batches from the source’s input buffers? Source function (like Kafka) have some internal buffers and it would be more efficient to read all/deserialise all data present in the input buffer at once, instead of paying synchronisation/calling virtual method/etc costs once per each record.

Piotrek

> On 22 Sep 2017, at 11:13, Aljoscha Krettek <al...@apache.org> wrote:
> 
> @Eron Yes, that would be the difference in characterisation. I think technically all sources could be transformed by that by pushing data into a (blocking) queue and having the "getElement()" method pull from that.
> 
>> On 15. Sep 2017, at 20:17, Elias Levy <fearsome.lucidity@gmail.com <ma...@gmail.com>> wrote:
>> 
>> On Fri, Sep 15, 2017 at 10:02 AM, Eron Wright <eronwright@gmail.com <ma...@gmail.com>> wrote:
>> Aljoscha, would it be correct to characterize your idea as a 'pull' source rather than the current 'push'?  It would be interesting to look at the existing connectors to see how hard it would be to reverse their orientation.   e.g. the source might require a buffer pool.
>> 
>> The Kafka client works that way.  As does the QueueingConsumer used by the RabbitMQ source.  The Kinesis and NiFi sources also seems to poll. Those are all the bundled sources.
> 


Re: Flink flick cancel vs stop

Posted by Aljoscha Krettek <al...@apache.org>.
@Eron Yes, that would be the difference in characterisation. I think technically all sources could be transformed by that by pushing data into a (blocking) queue and having the "getElement()" method pull from that.

> On 15. Sep 2017, at 20:17, Elias Levy <fe...@gmail.com> wrote:
> 
> On Fri, Sep 15, 2017 at 10:02 AM, Eron Wright <eronwright@gmail.com <ma...@gmail.com>> wrote:
> Aljoscha, would it be correct to characterize your idea as a 'pull' source rather than the current 'push'?  It would be interesting to look at the existing connectors to see how hard it would be to reverse their orientation.   e.g. the source might require a buffer pool.
> 
> The Kafka client works that way.  As does the QueueingConsumer used by the RabbitMQ source.  The Kinesis and NiFi sources also seems to poll. Those are all the bundled sources.


Re: Flink flick cancel vs stop

Posted by Elias Levy <fe...@gmail.com>.
On Fri, Sep 15, 2017 at 10:02 AM, Eron Wright <er...@gmail.com> wrote:

> Aljoscha, would it be correct to characterize your idea as a 'pull' source
> rather than the current 'push'?  It would be interesting to look at the
> existing connectors to see how hard it would be to reverse their
> orientation.   e.g. the source might require a buffer pool.
>

The Kafka client works that way.  As does the QueueingConsumer used by the
RabbitMQ source.  The Kinesis and NiFi sources also seems to poll. Those
are all the bundled sources.

Re: Flink flick cancel vs stop

Posted by Eron Wright <er...@gmail.com>.
Aljoscha, would it be correct to characterize your idea as a 'pull' source
rather than the current 'push'?  It would be interesting to look at the
existing connectors to see how hard it would be to reverse their
orientation.   e.g. the source might require a buffer pool.

On Fri, Sep 15, 2017 at 9:05 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Also relevant for this discussion: Several people (including me) by now
> were floating the idea of reworking the source interface to take away the
> responsibility of stopping/canceling/continuing from a specific source
> implementation and to instead give that power to the system. Currently each
> source does basically this:
>
> class Source<T> {
>   public void run(Context ctx, Lock lock) {
>     while ("forever long I want and I don't care") {
>       synchronized (lock) {
>         T output = ReadFrom.externalSystem();
>          updateReadPositionState();
>         ctx.collect(output);
>       }
>     }
>   }
> }
>
> Meaning that any stopping/canceling behaviour requires cooperation from
> the source implementation.
>
> This would be a different idea for a source interface:
>
> abstract class NewSource {
>   public abstract boolean start();
>   public abstract boolean advance();
>   public abstract void close();
>
>   public abstract T getCurrent();
>   public abstract Instant getCurrentTimestamp();
>   public abstract Instant getWatermark();
>
>   public abstract CheckpointMark getCheckpointMark();
> }
>
> Here the driver would sit outside and call the source whenever data should
> be provided. Stop/cancel would not be a feature of the source function but
> of the code that calls it.
>
> Best,
> Aljoscha
>
> On 14. Sep 2017, at 20:03, Eron Wright <er...@gmail.com> wrote:
>
> I too am curious about stop vs cancel.  I'm trying to understand the
> motivations a bit more.
>
> The current behavior of stop is basically that the sources become bounded,
> leading to the job winding down.
>
> The interesting question is how best to support 'planned' maintenance
> procedures such as app upgrade and scale changes.   I think a good
> enhancement could be to stop precisely at checkpoint time to prevent
> emission of spurious records.  Today the behavior of 'cancel w/ savepoint'
> is at-least-once because the two operations aren't atomic.  Earlier I had
> assumed that 'stop' would evolve in this direction but I suppose we could
> improve the atomicity of 'cancel /w savepoint' rather than implicating
> 'stop'.
>
> A different direction for 'stop' might be to improve the determinism of
> bounding a streaming job such that the stop point is well-understood in
> terms of the source.  For example, stopping at a offset provided as a stop
> parameter.   Today I suppose one would rely on external state to remember
> the stop point, e.g. FlinkKafkaConsumer010::setStartFromGroupOffsets.
>
> On Thu, Sep 14, 2017 at 1:03 AM, Ufuk Celebi <uc...@apache.org> wrote:
>
>> Hey Elias,
>>
>> sorry for the delay here. No, stop is not deprecated but not fully
>> implemented yet. One missing part is migration of the existing source
>> functions as you say.
>>
>> Let me pull in Till for more details on this. @Till: Is there more
>> missing than migrating the sources?
>>
>> Here is the PR and discussion for reference:
>> https://github.com/apache/flink/pull/750
>>
>> I would also really love to see this fully implemented in Flink. I
>> don't expect this to happen for the upcoming 1.4 release though.
>>
>> – Ufuk
>>
>>
>> On Wed, Sep 13, 2017 at 7:07 PM, Elias Levy <fe...@gmail.com>
>> wrote:
>> > Anyone?
>> >
>> > On Mon, Sep 11, 2017 at 6:17 PM, Elias Levy <
>> fearsome.lucidity@gmail.com>
>> > wrote:
>> >>
>> >> I was wondering about the status of the flink stop command.  At first
>> >> blush it would seem as the preferable way to shutdown a Flink job, but
>> it
>> >> depends on StoppableFunction being implemented by sources and I notice
>> that
>> >> the Kafka source does not seem to implement it.  In addition, the
>> command
>> >> does not -s  --withSavepoint like cancel does.
>> >>
>> >> Is stop deprecated?
>> >
>> >
>>
>
>
>

Re: Flink flick cancel vs stop

Posted by Aljoscha Krettek <al...@apache.org>.
Also relevant for this discussion: Several people (including me) by now were floating the idea of reworking the source interface to take away the responsibility of stopping/canceling/continuing from a specific source implementation and to instead give that power to the system. Currently each source does basically this:

class Source<T> {
  public void run(Context ctx, Lock lock) {
    while ("forever long I want and I don't care") {
      synchronized (lock) {
        T output = ReadFrom.externalSystem();
         updateReadPositionState();
        ctx.collect(output);
      }
    }
  }
}

Meaning that any stopping/canceling behaviour requires cooperation from the source implementation.

This would be a different idea for a source interface:

abstract class NewSource {
  public abstract boolean start();
  public abstract boolean advance();
  public abstract void close();

  public abstract T getCurrent();
  public abstract Instant getCurrentTimestamp();
  public abstract Instant getWatermark();

  public abstract CheckpointMark getCheckpointMark();
}

Here the driver would sit outside and call the source whenever data should be provided. Stop/cancel would not be a feature of the source function but of the code that calls it.

Best,
Aljoscha

> On 14. Sep 2017, at 20:03, Eron Wright <er...@gmail.com> wrote:
> 
> I too am curious about stop vs cancel.  I'm trying to understand the motivations a bit more.
> 
> The current behavior of stop is basically that the sources become bounded, leading to the job winding down.
> 
> The interesting question is how best to support 'planned' maintenance procedures such as app upgrade and scale changes.   I think a good enhancement could be to stop precisely at checkpoint time to prevent emission of spurious records.  Today the behavior of 'cancel w/ savepoint' is at-least-once because the two operations aren't atomic.  Earlier I had assumed that 'stop' would evolve in this direction but I suppose we could improve the atomicity of 'cancel /w savepoint' rather than implicating 'stop'.
> 
> A different direction for 'stop' might be to improve the determinism of bounding a streaming job such that the stop point is well-understood in terms of the source.  For example, stopping at a offset provided as a stop parameter.   Today I suppose one would rely on external state to remember the stop point, e.g. FlinkKafkaConsumer010::setStartFromGroupOffsets.
> 
> On Thu, Sep 14, 2017 at 1:03 AM, Ufuk Celebi <uce@apache.org <ma...@apache.org>> wrote:
> Hey Elias,
> 
> sorry for the delay here. No, stop is not deprecated but not fully
> implemented yet. One missing part is migration of the existing source
> functions as you say.
> 
> Let me pull in Till for more details on this. @Till: Is there more
> missing than migrating the sources?
> 
> Here is the PR and discussion for reference:
> https://github.com/apache/flink/pull/750 <https://github.com/apache/flink/pull/750>
> 
> I would also really love to see this fully implemented in Flink. I
> don't expect this to happen for the upcoming 1.4 release though.
> 
> – Ufuk
> 
> 
> On Wed, Sep 13, 2017 at 7:07 PM, Elias Levy <fearsome.lucidity@gmail.com <ma...@gmail.com>> wrote:
> > Anyone?
> >
> > On Mon, Sep 11, 2017 at 6:17 PM, Elias Levy <fearsome.lucidity@gmail.com <ma...@gmail.com>>
> > wrote:
> >>
> >> I was wondering about the status of the flink stop command.  At first
> >> blush it would seem as the preferable way to shutdown a Flink job, but it
> >> depends on StoppableFunction being implemented by sources and I notice that
> >> the Kafka source does not seem to implement it.  In addition, the command
> >> does not -s  --withSavepoint like cancel does.
> >>
> >> Is stop deprecated?
> >
> >
> 


Re: Flink flick cancel vs stop

Posted by Eron Wright <er...@gmail.com>.
I too am curious about stop vs cancel.  I'm trying to understand the
motivations a bit more.

The current behavior of stop is basically that the sources become bounded,
leading to the job winding down.

The interesting question is how best to support 'planned' maintenance
procedures such as app upgrade and scale changes.   I think a good
enhancement could be to stop precisely at checkpoint time to prevent
emission of spurious records.  Today the behavior of 'cancel w/ savepoint'
is at-least-once because the two operations aren't atomic.  Earlier I had
assumed that 'stop' would evolve in this direction but I suppose we could
improve the atomicity of 'cancel /w savepoint' rather than implicating
'stop'.

A different direction for 'stop' might be to improve the determinism of
bounding a streaming job such that the stop point is well-understood in
terms of the source.  For example, stopping at a offset provided as a stop
parameter.   Today I suppose one would rely on external state to remember
the stop point, e.g. FlinkKafkaConsumer010::setStartFromGroupOffsets.

On Thu, Sep 14, 2017 at 1:03 AM, Ufuk Celebi <uc...@apache.org> wrote:

> Hey Elias,
>
> sorry for the delay here. No, stop is not deprecated but not fully
> implemented yet. One missing part is migration of the existing source
> functions as you say.
>
> Let me pull in Till for more details on this. @Till: Is there more
> missing than migrating the sources?
>
> Here is the PR and discussion for reference:
> https://github.com/apache/flink/pull/750
>
> I would also really love to see this fully implemented in Flink. I
> don't expect this to happen for the upcoming 1.4 release though.
>
> – Ufuk
>
>
> On Wed, Sep 13, 2017 at 7:07 PM, Elias Levy <fe...@gmail.com>
> wrote:
> > Anyone?
> >
> > On Mon, Sep 11, 2017 at 6:17 PM, Elias Levy <fearsome.lucidity@gmail.com
> >
> > wrote:
> >>
> >> I was wondering about the status of the flink stop command.  At first
> >> blush it would seem as the preferable way to shutdown a Flink job, but
> it
> >> depends on StoppableFunction being implemented by sources and I notice
> that
> >> the Kafka source does not seem to implement it.  In addition, the
> command
> >> does not -s  --withSavepoint like cancel does.
> >>
> >> Is stop deprecated?
> >
> >
>

Re: Flink flick cancel vs stop

Posted by Ufuk Celebi <uc...@apache.org>.
Hey Elias,

sorry for the delay here. No, stop is not deprecated but not fully
implemented yet. One missing part is migration of the existing source
functions as you say.

Let me pull in Till for more details on this. @Till: Is there more
missing than migrating the sources?

Here is the PR and discussion for reference:
https://github.com/apache/flink/pull/750

I would also really love to see this fully implemented in Flink. I
don't expect this to happen for the upcoming 1.4 release though.

– Ufuk


On Wed, Sep 13, 2017 at 7:07 PM, Elias Levy <fe...@gmail.com> wrote:
> Anyone?
>
> On Mon, Sep 11, 2017 at 6:17 PM, Elias Levy <fe...@gmail.com>
> wrote:
>>
>> I was wondering about the status of the flink stop command.  At first
>> blush it would seem as the preferable way to shutdown a Flink job, but it
>> depends on StoppableFunction being implemented by sources and I notice that
>> the Kafka source does not seem to implement it.  In addition, the command
>> does not -s  --withSavepoint like cancel does.
>>
>> Is stop deprecated?
>
>

Re: Flink flick cancel vs stop

Posted by Elias Levy <fe...@gmail.com>.
Anyone?

On Mon, Sep 11, 2017 at 6:17 PM, Elias Levy <fe...@gmail.com>
wrote:

> I was wondering about the status of the flink stop command.  At first
> blush it would seem as the preferable way to shutdown a Flink job, but it
> depends on StoppableFunction being implemented by sources and I notice that
> the Kafka source does not seem to implement it.  In addition, the command
> does not -s  --withSavepoint like cancel does.
>
> Is stop deprecated?
>