You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Bowen Li <bo...@offerupnow.com> on 2017/08/25 04:23:07 UTC

Even out the number of generated windows

Hi guys,

I do have a question for how Flink generates windows.

We are using a 1-day sized sliding window with 1-hour slide to count some
features of items based on event time. We have about 20million items. We
observed that Flink only emit results on a fixed time in an hour (e.g. 1am,
2am, 3am,  or 1:15am, 2:15am, 3:15am with a 15min offset). That's means
20million windows/records are generated at the same time every hour, which
burns down our sink. But nothing is generated in the rest of that hour. The
pattern is like this:

# generated windows
|
|    /\                  /\
|   /  \                /  \
|_/__\_______/__\_
                                 time

Is there any way to even out the number of generated windows/records in an
hour? Can we have evenly distributed generated load like this?

# generated windows
|
|
| ------------------------
|_______________
                                 time

Thanks,
Bowen

Re: Even out the number of generated windows

Posted by Bowen Li <bo...@offerupnow.com>.
That's exactly what I found yesterday! Thank you Aljoscha for confirming it!

On Mon, Aug 28, 2017 at 2:57 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi Bowen,
>
> There is not built-in TTL but you can use a ProcessFunction to set a timer
> that clears state.
>
> ProcessFunction docs: https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/stream/process_function.html
>
> Best,
> Aljoscha
>
> On 27. Aug 2017, at 19:19, Bowen Li <bo...@offerupnow.com> wrote:
>
> Hi Robert,
>     Thank you for the suggestion, I'll try that.
>
>     On a second thought, I can actually reduce the amount of generated
> output so there aren't that many records being sent to Kinesis.
>
>     What I want to do is to use Flink's state to keep track of the last
> computation result of a window by each key. If the latest computation
> result is the same as the last one, my Flink job shouldn't emit a new
> record. However, that requires some expiration functionality so that the
> state won't grow indefinitely, as explained in https://issues.apache.org/
> jira/browse/FLINK-3089. Are there anyway to expire keyed state by time?
>
> Thanks,
> Bowen
>
>
>
> On Sun, Aug 27, 2017 at 5:41 AM, Robert Metzger <rm...@apache.org>
> wrote:
>
>> Hi Bowen,
>>
>> I don't know what kind of relationship your company has to AWS, maybe
>> they are willing to look into the issue from their side.
>>
>> To throttle a stream, I would recommend just doing a map operation that
>> is calling  "Thread.sleep(<ms>)" every n records.
>>
>> On Sat, Aug 26, 2017 at 4:11 AM, Bowen Li <bo...@offerupnow.com>
>> wrote:
>>
>>> Hi Robert,
>>> We use kinesis sink (FlinkKinesisProducer). The main pain is the Kinesis
>>> Producer Library (KPL) that FlinkKinesisProducer uses.
>>>
>>> KPL is basically a java wrapper with a c++ core. It's slow, unstable,
>>> easy to crash, memory-and-CPU-consuming (it sends traffic via HTTP), and
>>> can't handle high workload like a few million records at a short period of
>>> time. Well, in order to write to Kinesis, there's no other options except
>>> KPL (AWS Kinesis SDK is even slower), so I'm not blaming Flink chose KPL.
>>>
>>> Are there any recommended ways to "artificially throttle down the
>>> stream before the sink"? How to add the throttling into Flink's fluent
>>> API?
>>>
>>> Thanks,
>>> Bowen
>>>
>>>
>>> On Fri, Aug 25, 2017 at 2:31 PM, Robert Metzger <rm...@apache.org>
>>> wrote:
>>>
>>>> Hi Bowen,
>>>>
>>>> (very nice graphics :) )
>>>>
>>>> I don't think you can do anything about the windows itself (unless you
>>>> are able to build the windows yourself using the ProcessFunction, playing
>>>> some tricks because you know your data), so I should focus on reducing the
>>>> pain in "burning down your sink".
>>>> Are there any issues with the Sink by the spikes? (What's the
>>>> downstream system?)
>>>> Does it make sense for you to artificially throttle down the stream
>>>> before the sink, so that the records per second get limited to a certain
>>>> rate. Since you are using Event time, the window results will always be
>>>> correct & consistent. From a business perspective, this will of course
>>>> introduce additional latency (= results come in later).
>>>>
>>>>
>>>> On Fri, Aug 25, 2017 at 6:23 AM, Bowen Li <bo...@offerupnow.com>
>>>> wrote:
>>>>
>>>>> Hi guys,
>>>>>
>>>>> I do have a question for how Flink generates windows.
>>>>>
>>>>> We are using a 1-day sized sliding window with 1-hour slide to count
>>>>> some features of items based on event time. We have about 20million items.
>>>>> We observed that Flink only emit results on a fixed time in an hour (e.g.
>>>>> 1am, 2am, 3am,  or 1:15am, 2:15am, 3:15am with a 15min offset). That's
>>>>> means 20million windows/records are generated at the same time every hour,
>>>>> which burns down our sink. But nothing is generated in the rest of that
>>>>> hour. The pattern is like this:
>>>>>
>>>>> # generated windows
>>>>> |
>>>>> |    /\                  /\
>>>>> |   /  \                /  \
>>>>> |_/__\_______/__\_
>>>>>                                  time
>>>>>
>>>>> Is there any way to even out the number of generated windows/records
>>>>> in an hour? Can we have evenly distributed generated load like this?
>>>>>
>>>>> # generated windows
>>>>> |
>>>>> |
>>>>> | ------------------------
>>>>> |_______________
>>>>>                                  time
>>>>>
>>>>> Thanks,
>>>>> Bowen
>>>>>
>>>>>
>>>>
>>>
>>
>
>

Re: Even out the number of generated windows

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Bowen,

There is not built-in TTL but you can use a ProcessFunction to set a timer that clears state.

ProcessFunction docs: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/process_function.html

Best,
Aljoscha

> On 27. Aug 2017, at 19:19, Bowen Li <bo...@offerupnow.com> wrote:
> 
> Hi Robert,
>     Thank you for the suggestion, I'll try that.
> 
>     On a second thought, I can actually reduce the amount of generated output so there aren't that many records being sent to Kinesis.
> 
>     What I want to do is to use Flink's state to keep track of the last computation result of a window by each key. If the latest computation result is the same as the last one, my Flink job shouldn't emit a new record. However, that requires some expiration functionality so that the state won't grow indefinitely, as explained in https://issues.apache.org/jira/browse/FLINK-3089 <https://issues.apache.org/jira/browse/FLINK-3089>. Are there anyway to expire keyed state by time?
> 
> Thanks,
> Bowen
> 
> 
> 
> On Sun, Aug 27, 2017 at 5:41 AM, Robert Metzger <rmetzger@apache.org <ma...@apache.org>> wrote:
> Hi Bowen,
> 
> I don't know what kind of relationship your company has to AWS, maybe they are willing to look into the issue from their side.
> 
> To throttle a stream, I would recommend just doing a map operation that is calling  "Thread.sleep(<ms>)" every n records.
> 
> On Sat, Aug 26, 2017 at 4:11 AM, Bowen Li <bowen.li@offerupnow.com <ma...@offerupnow.com>> wrote:
> Hi Robert,
> We use kinesis sink (FlinkKinesisProducer). The main pain is the Kinesis Producer Library (KPL) that FlinkKinesisProducer uses.
> 
> KPL is basically a java wrapper with a c++ core. It's slow, unstable, easy to crash, memory-and-CPU-consuming (it sends traffic via HTTP), and can't handle high workload like a few million records at a short period of time. Well, in order to write to Kinesis, there's no other options except KPL (AWS Kinesis SDK is even slower), so I'm not blaming Flink chose KPL.
> 
> Are there any recommended ways to "artificially throttle down the stream before the sink"? How to add the throttling into Flink's fluent API?
> 
> Thanks,
> Bowen
> 
> 
> On Fri, Aug 25, 2017 at 2:31 PM, Robert Metzger <rmetzger@apache.org <ma...@apache.org>> wrote:
> Hi Bowen,
> 
> (very nice graphics :) )
> 
> I don't think you can do anything about the windows itself (unless you are able to build the windows yourself using the ProcessFunction, playing some tricks because you know your data), so I should focus on reducing the pain in "burning down your sink".
> Are there any issues with the Sink by the spikes? (What's the downstream system?)
> Does it make sense for you to artificially throttle down the stream before the sink, so that the records per second get limited to a certain rate. Since you are using Event time, the window results will always be correct & consistent. From a business perspective, this will of course introduce additional latency (= results come in later).
> 
> 
> On Fri, Aug 25, 2017 at 6:23 AM, Bowen Li <bowen.li@offerupnow.com <ma...@offerupnow.com>> wrote:
> Hi guys,
> 
> I do have a question for how Flink generates windows. 
> 
> We are using a 1-day sized sliding window with 1-hour slide to count some features of items based on event time. We have about 20million items. We observed that Flink only emit results on a fixed time in an hour (e.g. 1am, 2am, 3am,  or 1:15am, 2:15am, 3:15am with a 15min offset). That's means 20million windows/records are generated at the same time every hour, which burns down our sink. But nothing is generated in the rest of that hour. The pattern is like this:
> 
> # generated windows
> | 
> |    /\                  /\
> |   /  \                /  \
> |_/__\_______/__\_
>                                  time
> 
> Is there any way to even out the number of generated windows/records in an hour? Can we have evenly distributed generated load like this? 
> 
> # generated windows
> | 
> | 
> | ------------------------
> |_______________
>                                  time 
> 
> Thanks,
> Bowen
> 
> 
> 
> 
> 


Re: Even out the number of generated windows

Posted by Bowen Li <bo...@offerupnow.com>.
Hi Robert,
    Thank you for the suggestion, I'll try that.

    On a second thought, I can actually reduce the amount of generated
output so there aren't that many records being sent to Kinesis.

    What I want to do is to use Flink's state to keep track of the last
computation result of a window by each key. If the latest computation
result is the same as the last one, my Flink job shouldn't emit a new
record. However, that requires some expiration functionality so that the
state won't grow indefinitely, as explained in
https://issues.apache.org/jira/browse/FLINK-3089. Are there anyway to
expire keyed state by time?

Thanks,
Bowen



On Sun, Aug 27, 2017 at 5:41 AM, Robert Metzger <rm...@apache.org> wrote:

> Hi Bowen,
>
> I don't know what kind of relationship your company has to AWS, maybe they
> are willing to look into the issue from their side.
>
> To throttle a stream, I would recommend just doing a map operation that is
> calling  "Thread.sleep(<ms>)" every n records.
>
> On Sat, Aug 26, 2017 at 4:11 AM, Bowen Li <bo...@offerupnow.com> wrote:
>
>> Hi Robert,
>> We use kinesis sink (FlinkKinesisProducer). The main pain is the Kinesis
>> Producer Library (KPL) that FlinkKinesisProducer uses.
>>
>> KPL is basically a java wrapper with a c++ core. It's slow, unstable,
>> easy to crash, memory-and-CPU-consuming (it sends traffic via HTTP), and
>> can't handle high workload like a few million records at a short period of
>> time. Well, in order to write to Kinesis, there's no other options except
>> KPL (AWS Kinesis SDK is even slower), so I'm not blaming Flink chose KPL.
>>
>> Are there any recommended ways to "artificially throttle down the stream
>> before the sink"? How to add the throttling into Flink's fluent API?
>>
>> Thanks,
>> Bowen
>>
>>
>> On Fri, Aug 25, 2017 at 2:31 PM, Robert Metzger <rm...@apache.org>
>> wrote:
>>
>>> Hi Bowen,
>>>
>>> (very nice graphics :) )
>>>
>>> I don't think you can do anything about the windows itself (unless you
>>> are able to build the windows yourself using the ProcessFunction, playing
>>> some tricks because you know your data), so I should focus on reducing the
>>> pain in "burning down your sink".
>>> Are there any issues with the Sink by the spikes? (What's the downstream
>>> system?)
>>> Does it make sense for you to artificially throttle down the stream
>>> before the sink, so that the records per second get limited to a certain
>>> rate. Since you are using Event time, the window results will always be
>>> correct & consistent. From a business perspective, this will of course
>>> introduce additional latency (= results come in later).
>>>
>>>
>>> On Fri, Aug 25, 2017 at 6:23 AM, Bowen Li <bo...@offerupnow.com>
>>> wrote:
>>>
>>>> Hi guys,
>>>>
>>>> I do have a question for how Flink generates windows.
>>>>
>>>> We are using a 1-day sized sliding window with 1-hour slide to count
>>>> some features of items based on event time. We have about 20million items.
>>>> We observed that Flink only emit results on a fixed time in an hour (e.g.
>>>> 1am, 2am, 3am,  or 1:15am, 2:15am, 3:15am with a 15min offset). That's
>>>> means 20million windows/records are generated at the same time every hour,
>>>> which burns down our sink. But nothing is generated in the rest of that
>>>> hour. The pattern is like this:
>>>>
>>>> # generated windows
>>>> |
>>>> |    /\                  /\
>>>> |   /  \                /  \
>>>> |_/__\_______/__\_
>>>>                                  time
>>>>
>>>> Is there any way to even out the number of generated windows/records in
>>>> an hour? Can we have evenly distributed generated load like this?
>>>>
>>>> # generated windows
>>>> |
>>>> |
>>>> | ------------------------
>>>> |_______________
>>>>                                  time
>>>>
>>>> Thanks,
>>>> Bowen
>>>>
>>>>
>>>
>>
>

Re: Even out the number of generated windows

Posted by Robert Metzger <rm...@apache.org>.
Hi Bowen,

I don't know what kind of relationship your company has to AWS, maybe they
are willing to look into the issue from their side.

To throttle a stream, I would recommend just doing a map operation that is
calling  "Thread.sleep(<ms>)" every n records.

On Sat, Aug 26, 2017 at 4:11 AM, Bowen Li <bo...@offerupnow.com> wrote:

> Hi Robert,
> We use kinesis sink (FlinkKinesisProducer). The main pain is the Kinesis
> Producer Library (KPL) that FlinkKinesisProducer uses.
>
> KPL is basically a java wrapper with a c++ core. It's slow, unstable, easy
> to crash, memory-and-CPU-consuming (it sends traffic via HTTP), and can't
> handle high workload like a few million records at a short period of time.
> Well, in order to write to Kinesis, there's no other options except KPL
> (AWS Kinesis SDK is even slower), so I'm not blaming Flink chose KPL.
>
> Are there any recommended ways to "artificially throttle down the stream
> before the sink"? How to add the throttling into Flink's fluent API?
>
> Thanks,
> Bowen
>
>
> On Fri, Aug 25, 2017 at 2:31 PM, Robert Metzger <rm...@apache.org>
> wrote:
>
>> Hi Bowen,
>>
>> (very nice graphics :) )
>>
>> I don't think you can do anything about the windows itself (unless you
>> are able to build the windows yourself using the ProcessFunction, playing
>> some tricks because you know your data), so I should focus on reducing the
>> pain in "burning down your sink".
>> Are there any issues with the Sink by the spikes? (What's the downstream
>> system?)
>> Does it make sense for you to artificially throttle down the stream
>> before the sink, so that the records per second get limited to a certain
>> rate. Since you are using Event time, the window results will always be
>> correct & consistent. From a business perspective, this will of course
>> introduce additional latency (= results come in later).
>>
>>
>> On Fri, Aug 25, 2017 at 6:23 AM, Bowen Li <bo...@offerupnow.com>
>> wrote:
>>
>>> Hi guys,
>>>
>>> I do have a question for how Flink generates windows.
>>>
>>> We are using a 1-day sized sliding window with 1-hour slide to count
>>> some features of items based on event time. We have about 20million items.
>>> We observed that Flink only emit results on a fixed time in an hour (e.g.
>>> 1am, 2am, 3am,  or 1:15am, 2:15am, 3:15am with a 15min offset). That's
>>> means 20million windows/records are generated at the same time every hour,
>>> which burns down our sink. But nothing is generated in the rest of that
>>> hour. The pattern is like this:
>>>
>>> # generated windows
>>> |
>>> |    /\                  /\
>>> |   /  \                /  \
>>> |_/__\_______/__\_
>>>                                  time
>>>
>>> Is there any way to even out the number of generated windows/records in
>>> an hour? Can we have evenly distributed generated load like this?
>>>
>>> # generated windows
>>> |
>>> |
>>> | ------------------------
>>> |_______________
>>>                                  time
>>>
>>> Thanks,
>>> Bowen
>>>
>>>
>>
>

Re: Even out the number of generated windows

Posted by Bowen Li <bo...@offerupnow.com>.
Hi Robert,
We use kinesis sink (FlinkKinesisProducer). The main pain is the Kinesis
Producer Library (KPL) that FlinkKinesisProducer uses.

KPL is basically a java wrapper with a c++ core. It's slow, unstable, easy
to crash, memory-and-CPU-consuming (it sends traffic via HTTP), and can't
handle high workload like a few million records at a short period of time.
Well, in order to write to Kinesis, there's no other options except KPL
(AWS Kinesis SDK is even slower), so I'm not blaming Flink chose KPL.

Are there any recommended ways to "artificially throttle down the stream
before the sink"? How to add the throttling into Flink's fluent API?

Thanks,
Bowen


On Fri, Aug 25, 2017 at 2:31 PM, Robert Metzger <rm...@apache.org> wrote:

> Hi Bowen,
>
> (very nice graphics :) )
>
> I don't think you can do anything about the windows itself (unless you are
> able to build the windows yourself using the ProcessFunction, playing some
> tricks because you know your data), so I should focus on reducing the pain
> in "burning down your sink".
> Are there any issues with the Sink by the spikes? (What's the downstream
> system?)
> Does it make sense for you to artificially throttle down the stream before
> the sink, so that the records per second get limited to a certain rate.
> Since you are using Event time, the window results will always be correct &
> consistent. From a business perspective, this will of course introduce
> additional latency (= results come in later).
>
>
> On Fri, Aug 25, 2017 at 6:23 AM, Bowen Li <bo...@offerupnow.com> wrote:
>
>> Hi guys,
>>
>> I do have a question for how Flink generates windows.
>>
>> We are using a 1-day sized sliding window with 1-hour slide to count some
>> features of items based on event time. We have about 20million items. We
>> observed that Flink only emit results on a fixed time in an hour (e.g. 1am,
>> 2am, 3am,  or 1:15am, 2:15am, 3:15am with a 15min offset). That's means
>> 20million windows/records are generated at the same time every hour, which
>> burns down our sink. But nothing is generated in the rest of that hour. The
>> pattern is like this:
>>
>> # generated windows
>> |
>> |    /\                  /\
>> |   /  \                /  \
>> |_/__\_______/__\_
>>                                  time
>>
>> Is there any way to even out the number of generated windows/records in
>> an hour? Can we have evenly distributed generated load like this?
>>
>> # generated windows
>> |
>> |
>> | ------------------------
>> |_______________
>>                                  time
>>
>> Thanks,
>> Bowen
>>
>>
>

Re: Even out the number of generated windows

Posted by Robert Metzger <rm...@apache.org>.
Hi Bowen,

(very nice graphics :) )

I don't think you can do anything about the windows itself (unless you are
able to build the windows yourself using the ProcessFunction, playing some
tricks because you know your data), so I should focus on reducing the pain
in "burning down your sink".
Are there any issues with the Sink by the spikes? (What's the downstream
system?)
Does it make sense for you to artificially throttle down the stream before
the sink, so that the records per second get limited to a certain rate.
Since you are using Event time, the window results will always be correct &
consistent. From a business perspective, this will of course introduce
additional latency (= results come in later).


On Fri, Aug 25, 2017 at 6:23 AM, Bowen Li <bo...@offerupnow.com> wrote:

> Hi guys,
>
> I do have a question for how Flink generates windows.
>
> We are using a 1-day sized sliding window with 1-hour slide to count some
> features of items based on event time. We have about 20million items. We
> observed that Flink only emit results on a fixed time in an hour (e.g. 1am,
> 2am, 3am,  or 1:15am, 2:15am, 3:15am with a 15min offset). That's means
> 20million windows/records are generated at the same time every hour, which
> burns down our sink. But nothing is generated in the rest of that hour. The
> pattern is like this:
>
> # generated windows
> |
> |    /\                  /\
> |   /  \                /  \
> |_/__\_______/__\_
>                                  time
>
> Is there any way to even out the number of generated windows/records in an
> hour? Can we have evenly distributed generated load like this?
>
> # generated windows
> |
> |
> | ------------------------
> |_______________
>                                  time
>
> Thanks,
> Bowen
>
>