You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Kathula, Sandeep" <Sa...@intuit.com> on 2021/10/04 21:50:02 UTC

Beam program with Flink runner which can limit number of records processed in specified time when reading from Kafka

Hi,
    I have a Beam code with Flink runner which reads from Kafka, applies 10 minutes window and writes the data into parquet format in S3. Its running fine when everything goes well. But due to some issue, if my pipeline stops running for an hour or two, then for it to catch up from latest Flink checkpoint it’s trying to read data from Kafka at a very high rate and trying to dump to S3 in parquet format. As the data processed in the latest window period of 10 minutes is huge because of catching up with lag, it is failing with out of memory and its never able to be run successfully with my current resources. I checked that there is a Beam property called maxBundleSize through which we can control maximum size of a bundle but I didn’t find any property to handle number of bundles processed within the window interval.

   I wanted to check if there is any way to limit number of records processed within a window interval.

Thanks,
Sandeep

Re: Beam program with Flink runner which can limit number of records processed in specified time when reading from Kafka

Posted by Siyu Lin <si...@unity3d.com>.
Hi Luke,

I had the same issue before. Was wondering if it is possible to use a
different timestamp as the watermark. Thanks!

-siyu

On Fri, Nov 19, 2021 at 11:42 AM Luke Cwik <lc...@google.com> wrote:

> The issue seems to be that when doing a backfill Kafka is using wall time
> to estimate the watermark and then producing massive amounts of records for
> each interval window.
>
> If you set the watermark estimator within Kafka to be based upon data
> within the records then your interval windows will have pretty consistent
> sizes based upon how many records actually exist in those 10 minute windows.
>
> On Mon, Oct 4, 2021 at 2:50 PM Kathula, Sandeep <
> Sandeep_Kathula@intuit.com> wrote:
>
>> Hi,
>>
>>     I have a Beam code with Flink runner which reads from Kafka, applies
>> 10 minutes window and writes the data into parquet format in S3. Its
>> running fine when everything goes well. But due to some issue, if my
>> pipeline stops running for an hour or two, then for it to catch up from
>> latest Flink checkpoint it’s trying to read data from Kafka at a very high
>> rate and trying to dump to S3 in parquet format. As the data processed in
>> the latest window period of 10 minutes is huge because of catching up with
>> lag, it is failing with out of memory and its never able to be run
>> successfully with my current resources. I checked that there is a Beam
>> property called maxBundleSize through which we can control maximum size of
>> a bundle but I didn’t find any property to handle number of bundles
>> processed within the window interval.
>>
>>
>>
>>    I wanted to check if there is any way to limit number of records
>> processed within a window interval.
>>
>>
>>
>> Thanks,
>>
>> Sandeep
>>
>

Re: Beam program with Flink runner which can limit number of records processed in specified time when reading from Kafka

Posted by Luke Cwik <lc...@google.com>.
The issue seems to be that when doing a backfill Kafka is using wall time
to estimate the watermark and then producing massive amounts of records for
each interval window.

If you set the watermark estimator within Kafka to be based upon data
within the records then your interval windows will have pretty consistent
sizes based upon how many records actually exist in those 10 minute windows.

On Mon, Oct 4, 2021 at 2:50 PM Kathula, Sandeep <Sa...@intuit.com>
wrote:

> Hi,
>
>     I have a Beam code with Flink runner which reads from Kafka, applies
> 10 minutes window and writes the data into parquet format in S3. Its
> running fine when everything goes well. But due to some issue, if my
> pipeline stops running for an hour or two, then for it to catch up from
> latest Flink checkpoint it’s trying to read data from Kafka at a very high
> rate and trying to dump to S3 in parquet format. As the data processed in
> the latest window period of 10 minutes is huge because of catching up with
> lag, it is failing with out of memory and its never able to be run
> successfully with my current resources. I checked that there is a Beam
> property called maxBundleSize through which we can control maximum size of
> a bundle but I didn’t find any property to handle number of bundles
> processed within the window interval.
>
>
>
>    I wanted to check if there is any way to limit number of records
> processed within a window interval.
>
>
>
> Thanks,
>
> Sandeep
>