You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Kaymak, Tobias" <to...@ricardo.ch> on 2019/02/12 16:33:32 UTC

Dealing with "large" checkpoint state of a Beam pipeline in Flink

Hi,

my Beam 2.10-SNAPSHOT pipeline has a KafkaIO as input and a BigQueryIO
configured with FILE_LOADS as output. What bothers me is that even if I
configure in my Flink 1.6 configuration

state.backend: rocksdb
state.backend.incremental: true

I see states that are as big as 230 MiB and checkpoint timeouts, or
checkpoints that take longer than 10 minutes to complete (I just saw one
that took longer than 30 minutes).

Am I missing something? Is there some room for improvement? Should I use a
different storage backend for the checkpoints? (Currently they are stored
on GCS).

Best,
Tobi

Re: Dealing with "large" checkpoint state of a Beam pipeline in Flink

Posted by "Kaymak, Tobias" <to...@ricardo.ch>.
Thank you! I am using similar values but my problem was that my FILE_LOADS
were sometimes failing and this lead to this behavior. The pipeline didnt
fail though (which I was assuming it would do) it simply retried the
loading forever. (Retries were set to the default (-1)).

On Tue, Feb 12, 2019 at 6:11 PM Juan Carlos Garcia <jc...@gmail.com>
wrote:

> I forgot to mention that we uses hdfs as storage for checkpoint /
> savepoint.
>
> Juan Carlos Garcia <jc...@gmail.com> schrieb am Di., 12. Feb. 2019,
> 18:03:
>
>> Hi Tobias,
>>
>> I think this can happen when there is a lot of backpressure on the
>> pipeline.
>>
>> Don't know if it's normal but i have a pipeline reading from KafkaIO and
>> pushing to bigquery instreaming mode and i have seen checkpoint of almost
>> 1gb and whenever i am doing a savepoint for updating the pipeline it can
>> goes up to 8 GB of data on a savepoint.
>>
>> I am on Flink 1.5.x, on premises also using Rockdb and incremental.
>>
>> So far my only solutionto avoid errors while checkpointing or
>> savepointing is to make sure the checkpoint Timeout is high enough like 20m
>> or 30min.
>>
>>
>> Kaymak, Tobias <to...@ricardo.ch> schrieb am Di., 12. Feb. 2019,
>> 17:33:
>>
>>> Hi,
>>>
>>> my Beam 2.10-SNAPSHOT pipeline has a KafkaIO as input and a BigQueryIO
>>> configured with FILE_LOADS as output. What bothers me is that even if I
>>> configure in my Flink 1.6 configuration
>>>
>>> state.backend: rocksdb
>>> state.backend.incremental: true
>>>
>>> I see states that are as big as 230 MiB and checkpoint timeouts, or
>>> checkpoints that take longer than 10 minutes to complete (I just saw one
>>> that took longer than 30 minutes).
>>>
>>> Am I missing something? Is there some room for improvement? Should I use
>>> a different storage backend for the checkpoints? (Currently they are stored
>>> on GCS).
>>>
>>> Best,
>>> Tobi
>>>
>>

Re: Dealing with "large" checkpoint state of a Beam pipeline in Flink

Posted by Juan Carlos Garcia <jc...@gmail.com>.
I forgot to mention that we uses hdfs as storage for checkpoint /
savepoint.

Juan Carlos Garcia <jc...@gmail.com> schrieb am Di., 12. Feb. 2019,
18:03:

> Hi Tobias,
>
> I think this can happen when there is a lot of backpressure on the
> pipeline.
>
> Don't know if it's normal but i have a pipeline reading from KafkaIO and
> pushing to bigquery instreaming mode and i have seen checkpoint of almost
> 1gb and whenever i am doing a savepoint for updating the pipeline it can
> goes up to 8 GB of data on a savepoint.
>
> I am on Flink 1.5.x, on premises also using Rockdb and incremental.
>
> So far my only solutionto avoid errors while checkpointing or savepointing
> is to make sure the checkpoint Timeout is high enough like 20m or 30min.
>
>
> Kaymak, Tobias <to...@ricardo.ch> schrieb am Di., 12. Feb. 2019,
> 17:33:
>
>> Hi,
>>
>> my Beam 2.10-SNAPSHOT pipeline has a KafkaIO as input and a BigQueryIO
>> configured with FILE_LOADS as output. What bothers me is that even if I
>> configure in my Flink 1.6 configuration
>>
>> state.backend: rocksdb
>> state.backend.incremental: true
>>
>> I see states that are as big as 230 MiB and checkpoint timeouts, or
>> checkpoints that take longer than 10 minutes to complete (I just saw one
>> that took longer than 30 minutes).
>>
>> Am I missing something? Is there some room for improvement? Should I use
>> a different storage backend for the checkpoints? (Currently they are stored
>> on GCS).
>>
>> Best,
>> Tobi
>>
>

Re: Dealing with "large" checkpoint state of a Beam pipeline in Flink

Posted by Juan Carlos Garcia <jc...@gmail.com>.
Hi Tobias,

I think this can happen when there is a lot of backpressure on the
pipeline.

Don't know if it's normal but i have a pipeline reading from KafkaIO and
pushing to bigquery instreaming mode and i have seen checkpoint of almost
1gb and whenever i am doing a savepoint for updating the pipeline it can
goes up to 8 GB of data on a savepoint.

I am on Flink 1.5.x, on premises also using Rockdb and incremental.

So far my only solutionto avoid errors while checkpointing or savepointing
is to make sure the checkpoint Timeout is high enough like 20m or 30min.


Kaymak, Tobias <to...@ricardo.ch> schrieb am Di., 12. Feb. 2019,
17:33:

> Hi,
>
> my Beam 2.10-SNAPSHOT pipeline has a KafkaIO as input and a BigQueryIO
> configured with FILE_LOADS as output. What bothers me is that even if I
> configure in my Flink 1.6 configuration
>
> state.backend: rocksdb
> state.backend.incremental: true
>
> I see states that are as big as 230 MiB and checkpoint timeouts, or
> checkpoints that take longer than 10 minutes to complete (I just saw one
> that took longer than 30 minutes).
>
> Am I missing something? Is there some room for improvement? Should I use a
> different storage backend for the checkpoints? (Currently they are stored
> on GCS).
>
> Best,
> Tobi
>