You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Marco Costantini <ma...@rocketbnk.com> on 2021/09/29 22:28:15 UTC

How can I gracefully stop unbounded KafkaIO consumer?

Using a FlinkRunner, if I cancel the job, the pipeline blows up. Exceptions
stream across my screen rapidly.

```
java.lang.InterruptedException: null
        at
java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:944)
~[?:1.8.0_282]
        at
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.nextBatch(KafkaUnboundedReader.java:584)
~[blob_p-b0501fca08dc4506cf9cffe14466df74e4d010e9-d1cc5178ec372501d7c1e755b90de159:?]
```
How can I gracefully stop my Flink+Beam job that uses an unbounded KafkaIO
source?

If it is not explicitly supported, are there any work-arounds?

Please and thank you,
Marco.

Re: How can I gracefully stop unbounded KafkaIO consumer?

Posted by Marco Costantini <ma...@rocketbnk.com>.
Thanks to Jan,

I needed checkpointing. But also, my checkpointing was too short (100ms).
It caused my Kafka reader to remain alive for long after I cancel the job.
I set it to 1 second and it works as expected.

Thank you,
Marco.

On Thu, Sep 30, 2021 at 9:44 AM Marco Costantini <
marco.costantini@rocketbnk.com> wrote:

> Absolutely right. New findings.
>
> Checkpointing was NOT configured (!!!). So I added it. And, as you
> predicted, the exceptions no longer occur and the job cancels just fine -
> with one caveat.
>
> As I tail the TaskManager logs, it continues to stream:
>
> ```
> 2021-09-30 13:41:41,076 INFO
>  org.apache.kafka.clients.consumer.internals.SubscriptionState [] -
> [Consumer
> clientId=consumer-Reader-0_offset_consumer_891889822_rktbi-dev-accounts-3,
> groupId=Reader-0_offset_consumer_891889822_rktbi-dev-accounts] Seeking to
> LATEST offset of partition accountflow.ba.create.dev-0
> 2021-09-30 13:41:41,121 INFO
>  org.apache.kafka.clients.consumer.internals.SubscriptionState [] -
> [Consumer
> clientId=consumer-Reader-0_offset_consumer_891889822_rktbi-dev-accounts-3,
> groupId=Reader-0_offset_consumer_891889822_rktbi-dev-accounts] Resetting
> offset for partition accountflow.ba.create.dev-0 to offset 1.
> ```
>
> I've waited about 2 minutes after the Flink web UI reads that the job is
> fully cancelled and these still stream by the logs. Any ideas?
>
> In any case, thank you very much for your help, Jan.
>
> Marco
>
>
> On Thu, Sep 30, 2021 at 9:11 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Do you set --checkpointingInterval? I have seen similar behavior, but
>> only when checkpointing is disabled (missing), see [1].
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-12053
>> On 9/30/21 2:14 PM, Marco Costantini wrote:
>>
>> Thanks Jan,
>> More than continuing safely, I need to be able to stop the jobs safely.
>> Currently, doing so "blows up" the Task Manager. Blows up meaning that the
>> exceptions stream so fast that the TaskManager shuts down for an unobserved
>> reason : OOM? HDD space?
>>
>> If I connect to kafka with KafkaIO, then click Cancel Job -> boom (the
>> exceptions start streaming in logs)
>>
>> I've tried 'stopping' the job via the REST API but it gives a response
>> like "the module is already in Finished state. Not stopping". It is
>> correct in that one of my two pipeline stages is finished but one is in
>> RUNNING.
>>
>> Any tips to clean this mess up?
>>
>> On Thu, Sep 30, 2021 at 3:30 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi Marco,
>>>
>>> what is your intention? You want to upgrade the pipeline? Flink uses
>>> checkpoints / savepoints (see [1]), so cancelling pipeline to savepoint
>>> and then resuming from the savepoint should be safe. Another option is
>>> to enable offset commit to Kafka via [2]. That way you should be able to
>>> resume even without savepoint (but you will loose any internal pipeline
>>> state, so that is mostly useful for stateless pipelines).
>>>
>>> Would that work for you?
>>>
>>>   Jan
>>>
>>> [1]
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/
>>>
>>> [2]
>>>
>>> https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--
>>>
>>> On 9/30/21 12:28 AM, Marco Costantini wrote:
>>> > Using a FlinkRunner, if I cancel the job, the pipeline blows up.
>>> > Exceptions stream across my screen rapidly.
>>> >
>>> > ```
>>> > java.lang.InterruptedException: null
>>> >         at
>>> > java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:944)
>>> > ~[?:1.8.0_282]
>>> >         at
>>> >
>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.nextBatch(KafkaUnboundedReader.java:584)
>>>
>>> >
>>> ~[blob_p-b0501fca08dc4506cf9cffe14466df74e4d010e9-d1cc5178ec372501d7c1e755b90de159:?]
>>> > ```
>>> > How can I gracefully stop my Flink+Beam job that uses an unbounded
>>> > KafkaIO source?
>>> >
>>> > If it is not explicitly supported, are there any work-arounds?
>>> >
>>> > Please and thank you,
>>> > Marco.
>>>
>>

Re: How can I gracefully stop unbounded KafkaIO consumer?

Posted by Marco Costantini <ma...@rocketbnk.com>.
Absolutely right. New findings.

Checkpointing was NOT configured (!!!). So I added it. And, as you
predicted, the exceptions no longer occur and the job cancels just fine -
with one caveat.

As I tail the TaskManager logs, it continues to stream:

```
2021-09-30 13:41:41,076 INFO
 org.apache.kafka.clients.consumer.internals.SubscriptionState [] -
[Consumer
clientId=consumer-Reader-0_offset_consumer_891889822_rktbi-dev-accounts-3,
groupId=Reader-0_offset_consumer_891889822_rktbi-dev-accounts] Seeking to
LATEST offset of partition accountflow.ba.create.dev-0
2021-09-30 13:41:41,121 INFO
 org.apache.kafka.clients.consumer.internals.SubscriptionState [] -
[Consumer
clientId=consumer-Reader-0_offset_consumer_891889822_rktbi-dev-accounts-3,
groupId=Reader-0_offset_consumer_891889822_rktbi-dev-accounts] Resetting
offset for partition accountflow.ba.create.dev-0 to offset 1.
```

I've waited about 2 minutes after the Flink web UI reads that the job is
fully cancelled and these still stream by the logs. Any ideas?

In any case, thank you very much for your help, Jan.

Marco


On Thu, Sep 30, 2021 at 9:11 AM Jan Lukavský <je...@seznam.cz> wrote:

> Do you set --checkpointingInterval? I have seen similar behavior, but only
> when checkpointing is disabled (missing), see [1].
>
> [1] https://issues.apache.org/jira/browse/BEAM-12053
> On 9/30/21 2:14 PM, Marco Costantini wrote:
>
> Thanks Jan,
> More than continuing safely, I need to be able to stop the jobs safely.
> Currently, doing so "blows up" the Task Manager. Blows up meaning that the
> exceptions stream so fast that the TaskManager shuts down for an unobserved
> reason : OOM? HDD space?
>
> If I connect to kafka with KafkaIO, then click Cancel Job -> boom (the
> exceptions start streaming in logs)
>
> I've tried 'stopping' the job via the REST API but it gives a response
> like "the module is already in Finished state. Not stopping". It is
> correct in that one of my two pipeline stages is finished but one is in
> RUNNING.
>
> Any tips to clean this mess up?
>
> On Thu, Sep 30, 2021 at 3:30 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Marco,
>>
>> what is your intention? You want to upgrade the pipeline? Flink uses
>> checkpoints / savepoints (see [1]), so cancelling pipeline to savepoint
>> and then resuming from the savepoint should be safe. Another option is
>> to enable offset commit to Kafka via [2]. That way you should be able to
>> resume even without savepoint (but you will loose any internal pipeline
>> state, so that is mostly useful for stateless pipelines).
>>
>> Would that work for you?
>>
>>   Jan
>>
>> [1]
>>
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/
>>
>> [2]
>>
>> https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--
>>
>> On 9/30/21 12:28 AM, Marco Costantini wrote:
>> > Using a FlinkRunner, if I cancel the job, the pipeline blows up.
>> > Exceptions stream across my screen rapidly.
>> >
>> > ```
>> > java.lang.InterruptedException: null
>> >         at
>> > java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:944)
>> > ~[?:1.8.0_282]
>> >         at
>> >
>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.nextBatch(KafkaUnboundedReader.java:584)
>>
>> >
>> ~[blob_p-b0501fca08dc4506cf9cffe14466df74e4d010e9-d1cc5178ec372501d7c1e755b90de159:?]
>> > ```
>> > How can I gracefully stop my Flink+Beam job that uses an unbounded
>> > KafkaIO source?
>> >
>> > If it is not explicitly supported, are there any work-arounds?
>> >
>> > Please and thank you,
>> > Marco.
>>
>

Re: How can I gracefully stop unbounded KafkaIO consumer?

Posted by Jan Lukavský <je...@seznam.cz>.
Do you set --checkpointingInterval? I have seen similar behavior, but 
only when checkpointing is disabled (missing), see [1].

[1] https://issues.apache.org/jira/browse/BEAM-12053

On 9/30/21 2:14 PM, Marco Costantini wrote:
> Thanks Jan,
> More than continuing safely, I need to be able to stop the jobs 
> safely. Currently, doing so "blows up" the Task Manager. Blows up 
> meaning that the exceptions stream so fast that the TaskManager shuts 
> down for an unobserved reason : OOM? HDD space?
>
> If I connect to kafka with KafkaIO, then click Cancel Job -> boom (the 
> exceptions start streaming in logs)
>
> I've tried 'stopping' the job via the REST API but it gives a response 
> like "the module is already in Finished state. Not stopping". It is 
> correct in that one of my two pipeline stages is finished but one is 
> in RUNNING.
>
> Any tips to clean this mess up?
>
> On Thu, Sep 30, 2021 at 3:30 AM Jan Lukavský <je.ik@seznam.cz 
> <ma...@seznam.cz>> wrote:
>
>     Hi Marco,
>
>     what is your intention? You want to upgrade the pipeline? Flink uses
>     checkpoints / savepoints (see [1]), so cancelling pipeline to
>     savepoint
>     and then resuming from the savepoint should be safe. Another
>     option is
>     to enable offset commit to Kafka via [2]. That way you should be
>     able to
>     resume even without savepoint (but you will loose any internal
>     pipeline
>     state, so that is mostly useful for stateless pipelines).
>
>     Would that work for you?
>
>       Jan
>
>     [1]
>     https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/
>     <https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/>
>
>     [2]
>     https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--
>     <https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize-->
>
>     On 9/30/21 12:28 AM, Marco Costantini wrote:
>     > Using a FlinkRunner, if I cancel the job, the pipeline blows up.
>     > Exceptions stream across my screen rapidly.
>     >
>     > ```
>     > java.lang.InterruptedException: null
>     >         at
>     >
>     java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:944)
>     > ~[?:1.8.0_282]
>     >         at
>     >
>     org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.nextBatch(KafkaUnboundedReader.java:584)
>
>     >
>     ~[blob_p-b0501fca08dc4506cf9cffe14466df74e4d010e9-d1cc5178ec372501d7c1e755b90de159:?]
>     > ```
>     > How can I gracefully stop my Flink+Beam job that uses an unbounded
>     > KafkaIO source?
>     >
>     > If it is not explicitly supported, are there any work-arounds?
>     >
>     > Please and thank you,
>     > Marco.
>

Re: How can I gracefully stop unbounded KafkaIO consumer?

Posted by Marco Costantini <ma...@rocketbnk.com>.
Thanks Jan,
More than continuing safely, I need to be able to stop the jobs safely.
Currently, doing so "blows up" the Task Manager. Blows up meaning that the
exceptions stream so fast that the TaskManager shuts down for an unobserved
reason : OOM? HDD space?

If I connect to kafka with KafkaIO, then click Cancel Job -> boom (the
exceptions start streaming in logs)

I've tried 'stopping' the job via the REST API but it gives a response like
"the module is already in Finished state. Not stopping". It is correct in
that one of my two pipeline stages is finished but one is in RUNNING.

Any tips to clean this mess up?

On Thu, Sep 30, 2021 at 3:30 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Marco,
>
> what is your intention? You want to upgrade the pipeline? Flink uses
> checkpoints / savepoints (see [1]), so cancelling pipeline to savepoint
> and then resuming from the savepoint should be safe. Another option is
> to enable offset commit to Kafka via [2]. That way you should be able to
> resume even without savepoint (but you will loose any internal pipeline
> state, so that is mostly useful for stateless pipelines).
>
> Would that work for you?
>
>   Jan
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/
>
> [2]
>
> https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--
>
> On 9/30/21 12:28 AM, Marco Costantini wrote:
> > Using a FlinkRunner, if I cancel the job, the pipeline blows up.
> > Exceptions stream across my screen rapidly.
> >
> > ```
> > java.lang.InterruptedException: null
> >         at
> > java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:944)
> > ~[?:1.8.0_282]
> >         at
> >
> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.nextBatch(KafkaUnboundedReader.java:584)
>
> >
> ~[blob_p-b0501fca08dc4506cf9cffe14466df74e4d010e9-d1cc5178ec372501d7c1e755b90de159:?]
> > ```
> > How can I gracefully stop my Flink+Beam job that uses an unbounded
> > KafkaIO source?
> >
> > If it is not explicitly supported, are there any work-arounds?
> >
> > Please and thank you,
> > Marco.
>

Re: How can I gracefully stop unbounded KafkaIO consumer?

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Marco,

what is your intention? You want to upgrade the pipeline? Flink uses 
checkpoints / savepoints (see [1]), so cancelling pipeline to savepoint 
and then resuming from the savepoint should be safe. Another option is 
to enable offset commit to Kafka via [2]. That way you should be able to 
resume even without savepoint (but you will loose any internal pipeline 
state, so that is mostly useful for stateless pipelines).

Would that work for you?

  Jan

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/

[2] 
https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--

On 9/30/21 12:28 AM, Marco Costantini wrote:
> Using a FlinkRunner, if I cancel the job, the pipeline blows up. 
> Exceptions stream across my screen rapidly.
>
> ```
> java.lang.InterruptedException: null
>         at 
> java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:944) 
> ~[?:1.8.0_282]
>         at 
> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.nextBatch(KafkaUnboundedReader.java:584) 
> ~[blob_p-b0501fca08dc4506cf9cffe14466df74e4d010e9-d1cc5178ec372501d7c1e755b90de159:?]
> ```
> How can I gracefully stop my Flink+Beam job that uses an unbounded 
> KafkaIO source?
>
> If it is not explicitly supported, are there any work-arounds?
>
> Please and thank you,
> Marco.