You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by wang xuchen <be...@gmail.com> on 2019/07/01 19:38:42 UTC

Hi Flink experts,

I am prototyping a real time system that reads from Kafka source with Flink
and calls out to an external system as part of the event processing. One of
the most important requirements are read from Kafka should NEVER stall,
even in face of some async external calls slowness while holding certain
some kafka offsets. At least once processing is good enough.

Currently, I am using AsyncIO with a thread pool of size 20. My
understanding is if I use orderedwait with a large 'capacity', consumption
from Kafka should continue even if some external calls experience slowness
(holding the offsets) as long as the capacity is not exhausted.

(From my own reading of Flink source code, the capacity of the orderedwait
function translate to the size of the OrderedStreamElementQueue size.)

However, I expect that while the external calls stuck, stream source should
keep pumping out from Kafka as long as there is still capacity, but offset
after the stuck record should NOT be committed back to Kafka and (the
checkpoint should also stall to accomodate the stalled offests?)

My observation is, if I set the capacity large enough (max_int / 100 for
instance), the consumption was not stalled (which is good), but the offsets
were all committed back to Kafka AFTER the stalled records and all
checkpoint succeeded, no back pressure was incurred.

In this case, if some machines crash, how does Flink recover the stalled
offsets? Which checkpoint does Flink rollback to?  I understand that
commiting offset back to Kafka is merely to show progress to external
monitoring tool, but I hope Flink does book keeping somewhere to journal
async call xyz is not return and should be retried during recovery.

======

I`ve done a some more experiments, looks like Flink is able to recover the
record which I threw completeExceptionly even if I use 'unorderedwait' on
the async stream.

Which leads to Fabian`s early comments, 'Flink does not rely on Kafka
consumer offset to recover, committing offset to Kafka is merely to show
progress to external monitoring tools'.

I couldn`t pinpoint the code that Flink uses the achieve it, maybe
in-flight async invokations in 'unorderedstreamelementqueue' are part of
the checkpoint and Flink saves the actual payload for later replay?

Can anyone cast some lights?

Re:

Posted by Bowen Li <bo...@gmail.com>.
Hi Xuchen,

Every email in our ML asking questions **MUST** have a valid subject, to
facilitate archive search in the future and save people's time to decide
whether they can help answer your question or not by just glimpsing the
subject thru their email clients.

Though your question itself is well written, I don't think it's acceptable
to not have a well written subject. Note that this is brought up not
specific to you as a person, but specific to a common practice everyone in
the community should follow.

Bowen

On Sun, Jul 7, 2019 at 1:19 PM Konstantin Knauf <ko...@ververica.com>
wrote:

> Hi Wang,
>
> you guessed correctly, the events are not replayed from Kafka, but are
> part of the state of the AsyncWaitOperator and the request are resubmitted
> by the AsyncOperator in it's open() method.
>
> Cheers,
>
> Konstantin
>
>
>
> On Mon, Jul 1, 2019 at 9:39 PM wang xuchen <be...@gmail.com> wrote:
>
>> Hi Flink experts,
>>
>> I am prototyping a real time system that reads from Kafka source with
>> Flink and calls out to an external system as part of the event processing.
>> One of the most important requirements are read from Kafka should NEVER
>> stall, even in face of some async external calls slowness while holding
>> certain some kafka offsets. At least once processing is good enough.
>>
>> Currently, I am using AsyncIO with a thread pool of size 20. My
>> understanding is if I use orderedwait with a large 'capacity', consumption
>> from Kafka should continue even if some external calls experience slowness
>> (holding the offsets) as long as the capacity is not exhausted.
>>
>> (From my own reading of Flink source code, the capacity of the
>> orderedwait function translate to the size of the OrderedStreamElementQueue
>> size.)
>>
>> However, I expect that while the external calls stuck, stream source
>> should keep pumping out from Kafka as long as there is still capacity, but
>> offset after the stuck record should NOT be committed back to Kafka and
>> (the checkpoint should also stall to accomodate the stalled offests?)
>>
>> My observation is, if I set the capacity large enough (max_int / 100 for
>> instance), the consumption was not stalled (which is good), but the offsets
>> were all committed back to Kafka AFTER the stalled records and all
>> checkpoint succeeded, no back pressure was incurred.
>>
>> In this case, if some machines crash, how does Flink recover the stalled
>> offsets? Which checkpoint does Flink rollback to?  I understand that
>> commiting offset back to Kafka is merely to show progress to external
>> monitoring tool, but I hope Flink does book keeping somewhere to journal
>> async call xyz is not return and should be retried during recovery.
>>
>> ======
>>
>> I`ve done a some more experiments, looks like Flink is able to recover
>> the record which I threw completeExceptionly even if I use 'unorderedwait'
>> on the async stream.
>>
>> Which leads to Fabian`s early comments, 'Flink does not rely on Kafka
>> consumer offset to recover, committing offset to Kafka is merely to show
>> progress to external monitoring tools'.
>>
>> I couldn`t pinpoint the code that Flink uses the achieve it, maybe
>> in-flight async invokations in 'unorderedstreamelementqueue' are part of
>> the checkpoint and Flink saves the actual payload for later replay?
>>
>> Can anyone cast some lights?
>>
>
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
>
> Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010
>
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>

Re:

Posted by Konstantin Knauf <ko...@ververica.com>.
Hi Wang,

you guessed correctly, the events are not replayed from Kafka, but are part
of the state of the AsyncWaitOperator and the request are resubmitted by
the AsyncOperator in it's open() method.

Cheers,

Konstantin



On Mon, Jul 1, 2019 at 9:39 PM wang xuchen <be...@gmail.com> wrote:

> Hi Flink experts,
>
> I am prototyping a real time system that reads from Kafka source with
> Flink and calls out to an external system as part of the event processing.
> One of the most important requirements are read from Kafka should NEVER
> stall, even in face of some async external calls slowness while holding
> certain some kafka offsets. At least once processing is good enough.
>
> Currently, I am using AsyncIO with a thread pool of size 20. My
> understanding is if I use orderedwait with a large 'capacity', consumption
> from Kafka should continue even if some external calls experience slowness
> (holding the offsets) as long as the capacity is not exhausted.
>
> (From my own reading of Flink source code, the capacity of the orderedwait
> function translate to the size of the OrderedStreamElementQueue size.)
>
> However, I expect that while the external calls stuck, stream source
> should keep pumping out from Kafka as long as there is still capacity, but
> offset after the stuck record should NOT be committed back to Kafka and
> (the checkpoint should also stall to accomodate the stalled offests?)
>
> My observation is, if I set the capacity large enough (max_int / 100 for
> instance), the consumption was not stalled (which is good), but the offsets
> were all committed back to Kafka AFTER the stalled records and all
> checkpoint succeeded, no back pressure was incurred.
>
> In this case, if some machines crash, how does Flink recover the stalled
> offsets? Which checkpoint does Flink rollback to?  I understand that
> commiting offset back to Kafka is merely to show progress to external
> monitoring tool, but I hope Flink does book keeping somewhere to journal
> async call xyz is not return and should be retried during recovery.
>
> ======
>
> I`ve done a some more experiments, looks like Flink is able to recover the
> record which I threw completeExceptionly even if I use 'unorderedwait' on
> the async stream.
>
> Which leads to Fabian`s early comments, 'Flink does not rely on Kafka
> consumer offset to recover, committing offset to Kafka is merely to show
> progress to external monitoring tools'.
>
> I couldn`t pinpoint the code that Flink uses the achieve it, maybe
> in-flight async invokations in 'unorderedstreamelementqueue' are part of
> the checkpoint and Flink saves the actual payload for later replay?
>
> Can anyone cast some lights?
>


-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525


Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010


--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen