You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Sachin Mittal <sj...@gmail.com> on 2023/05/12 14:03:39 UTC

Is there a way to generated bounded sequence emitted at a particular rate

Hi,
I want to emit a bounded sequence of numbers from 0 to n but downstream to
receive this sequence at a given rate.

This is needed so that we can rate limit the HTTP request downstream.

Say if we generate sequence from 1 - 100 then downstream would make 100
such requests almost at the same time.

So to add gaps I am trying something like this.

Would a code like this work ?
pipeline
.apply(GenerateSequence.from(0).to(100).withRate(1, Duration.standardSeconds
(5)))
.apply(ParDo.of(new BatchDataLoad()))
.apply(KinesisIO.write()
.withStreamName(streamName)
// other configs
);


Somehow this does not seem to be generating numbers at that rate which is 1
per 5 seconds but all at one time.
Also looks like it may be creating an unbounded collection and looks like
kinesis is not writing anything to the stream.

If not then is there a way to achieve this?

Thanks
Sachin

Re: Is there a way to generated bounded sequence emitted at a particular rate

Posted by Pavel Solomin <p....@gmail.com>.
Direct runner was meant to be test-only runner, and not to be
production-use runner, and I don't know if it behaves fine with batch
processing of data bulks. Do you experience the same issues when you run
everything on Flink runner?

Beam codebase has integration tests with Direct runner - those include
writing and reading:

- Legacy:
https://github.com/apache/beam/blob/master/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
- AWS SDK V2:
https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/testing/KinesisIOIT.java

Maybe, it will help you to set up yours.

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>





On Fri, 12 May 2023 at 16:48, Sachin Mittal <sj...@gmail.com> wrote:

> I am using a direct runner.
>
> If I remove the
> .withRate(1, Duration.standardSeconds(5)
>
> Then Kinesis IO writes to Kinesis, however it receives all the input
> records at once and then throws:
> *KPL Expiration reached while waiting in limiter*
>
> I suppose we have certain limitations with direct runner (which I am only
> using for writing test cases).
> Real example will run on flink runner.
>
> Thanks
> Sachin
>
>
> On Fri, May 12, 2023 at 9:09 PM Pavel Solomin <p....@gmail.com>
> wrote:
>
>> Hello!
>>
>> > this does not seem to be generating numbers at that rate which is 1 per
>> 5 seconds but all at one time
>>
>> What runner do you use? I've seen that behavior of GenerateSequence only
>> in Direct runner.
>>
>> > Also looks like it may be creating an unbounded collection and looks
>> like kinesis is not writing anything to the stream.
>>
>> Never seen that happening, and I used KinesisIO quite a lot recently in
>> my playgrounds - in the same way you use, generating sequences and writing
>> to Kinesis. Can you share a full reproducible example of stuck KinesisIO?
>>
>> Best Regards,
>> Pavel Solomin
>>
>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>> <https://www.linkedin.com/in/pavelsolomin>
>>
>>
>>
>>
>>
>> On Fri, 12 May 2023 at 15:04, Sachin Mittal <sj...@gmail.com> wrote:
>>
>>> Hi,
>>> I want to emit a bounded sequence of numbers from 0 to n but downstream
>>> to receive this sequence at a given rate.
>>>
>>> This is needed so that we can rate limit the HTTP request downstream.
>>>
>>> Say if we generate sequence from 1 - 100 then downstream would make 100
>>> such requests almost at the same time.
>>>
>>> So to add gaps I am trying something like this.
>>>
>>> Would a code like this work ?
>>> pipeline
>>> .apply(GenerateSequence.from(0).to(100).withRate(1, Duration.
>>> standardSeconds(5)))
>>> .apply(ParDo.of(new BatchDataLoad()))
>>> .apply(KinesisIO.write()
>>> .withStreamName(streamName)
>>> // other configs
>>> );
>>>
>>>
>>> Somehow this does not seem to be generating numbers at that rate which
>>> is 1 per 5 seconds but all at one time.
>>> Also looks like it may be creating an unbounded collection and looks
>>> like kinesis is not writing anything to the stream.
>>>
>>> If not then is there a way to achieve this?
>>>
>>> Thanks
>>> Sachin
>>>
>>>

Re: Is there a way to generated bounded sequence emitted at a particular rate

Posted by Sachin Mittal <sj...@gmail.com>.
I am using a direct runner.

If I remove the
.withRate(1, Duration.standardSeconds(5)

Then Kinesis IO writes to Kinesis, however it receives all the input
records at once and then throws:
*KPL Expiration reached while waiting in limiter*

I suppose we have certain limitations with direct runner (which I am only
using for writing test cases).
Real example will run on flink runner.

Thanks
Sachin


On Fri, May 12, 2023 at 9:09 PM Pavel Solomin <p....@gmail.com> wrote:

> Hello!
>
> > this does not seem to be generating numbers at that rate which is 1 per
> 5 seconds but all at one time
>
> What runner do you use? I've seen that behavior of GenerateSequence only
> in Direct runner.
>
> > Also looks like it may be creating an unbounded collection and looks
> like kinesis is not writing anything to the stream.
>
> Never seen that happening, and I used KinesisIO quite a lot recently in my
> playgrounds - in the same way you use, generating sequences and writing to
> Kinesis. Can you share a full reproducible example of stuck KinesisIO?
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
>
> On Fri, 12 May 2023 at 15:04, Sachin Mittal <sj...@gmail.com> wrote:
>
>> Hi,
>> I want to emit a bounded sequence of numbers from 0 to n but downstream
>> to receive this sequence at a given rate.
>>
>> This is needed so that we can rate limit the HTTP request downstream.
>>
>> Say if we generate sequence from 1 - 100 then downstream would make 100
>> such requests almost at the same time.
>>
>> So to add gaps I am trying something like this.
>>
>> Would a code like this work ?
>> pipeline
>> .apply(GenerateSequence.from(0).to(100).withRate(1, Duration.
>> standardSeconds(5)))
>> .apply(ParDo.of(new BatchDataLoad()))
>> .apply(KinesisIO.write()
>> .withStreamName(streamName)
>> // other configs
>> );
>>
>>
>> Somehow this does not seem to be generating numbers at that rate which is
>> 1 per 5 seconds but all at one time.
>> Also looks like it may be creating an unbounded collection and looks like
>> kinesis is not writing anything to the stream.
>>
>> If not then is there a way to achieve this?
>>
>> Thanks
>> Sachin
>>
>>

Re: Is there a way to generated bounded sequence emitted at a particular rate

Posted by Pavel Solomin <p....@gmail.com>.
Hello!

> this does not seem to be generating numbers at that rate which is 1 per 5
seconds but all at one time

What runner do you use? I've seen that behavior of GenerateSequence only in
Direct runner.

> Also looks like it may be creating an unbounded collection and looks like
kinesis is not writing anything to the stream.

Never seen that happening, and I used KinesisIO quite a lot recently in my
playgrounds - in the same way you use, generating sequences and writing to
Kinesis. Can you share a full reproducible example of stuck KinesisIO?

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>





On Fri, 12 May 2023 at 15:04, Sachin Mittal <sj...@gmail.com> wrote:

> Hi,
> I want to emit a bounded sequence of numbers from 0 to n but downstream to
> receive this sequence at a given rate.
>
> This is needed so that we can rate limit the HTTP request downstream.
>
> Say if we generate sequence from 1 - 100 then downstream would make 100
> such requests almost at the same time.
>
> So to add gaps I am trying something like this.
>
> Would a code like this work ?
> pipeline
> .apply(GenerateSequence.from(0).to(100).withRate(1, Duration.
> standardSeconds(5)))
> .apply(ParDo.of(new BatchDataLoad()))
> .apply(KinesisIO.write()
> .withStreamName(streamName)
> // other configs
> );
>
>
> Somehow this does not seem to be generating numbers at that rate which is
> 1 per 5 seconds but all at one time.
> Also looks like it may be creating an unbounded collection and looks like
> kinesis is not writing anything to the stream.
>
> If not then is there a way to achieve this?
>
> Thanks
> Sachin
>
>