You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Till Rohrmann <tr...@apache.org> on 2017/09/14 14:43:03 UTC

Re: Flink doesn't free YARN slots after restarting

Sorry for my late answer Bowen,

I think this only works if you implement your own WindowAssigner. With the
built-in sliding window this is not possible since all windows have the
same offset.

Cheers,
Till

On Fri, Aug 25, 2017 at 9:44 AM, Bowen Li <bo...@offerupnow.com> wrote:

> Hi Till,
> What I mean is: can the sliding windows for different item have different
> start time?
>
> Here's an example of what we want:
> - for item A: its first event arrives at 2017/8/24-01:*12:24*, so the 1st
> window should be 2017/8/24-01:*12:24* - 2017/8/25-01:*12:23*, the 2nd
> window would be 2017/8/24-02:*12:24* - 2017/8/25-02:*12:23*, and so on
> - for item B: its first event arrives at 2017/8/24-01:*10:20*, so the 1st
> window should be 2017/8/24-01:*10:20* - 2017/8/25-01:*10:19*, the 2nd
> window would be 2017/8/24-02:*10:20* - 2017/8/25-02:*10:19*, and so on.
>
> But we observed that what Flink does is: for both A and B, their own
> unique time offset within an hour (*12:24 and 10:20*) are eliminated by
> Flink, and windows are unified to be like 2017/8/24-01:*00:00* -
> 2017/8/25-01:*00:00*, 2017/8/24-02:*00:00* - 2017/8/25-02:*00:00*, and so
> on.
>
> Unifying the starting time of windows for all items brings us trouble. It
> means 20million windows are triggered and fired at same time, and the
> downstream Kinesis sink cannot handle the amount of output. We actually
> want windows for different items to be triggered and fired at different
> time within an hour, so we can even out the amount of output to downstream
> Kinesis sink, as my ASCII charts demonstrated.
>
> Does my example make sense?
>
> Thanks,
> Bowen
>
> On Fri, Aug 25, 2017 at 12:01 AM, Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Bowen,
>>
>> having a sliding window of one day with a slide of one hour basically
>> means that each window is closed after 24 hours and the next closing
>> happens one hour later. Only when the window is closed/triggered, you
>> compute the window function which generates the window output. That's why
>> you see the spikes in your load and it's basically caused by the program
>> semantics.
>>
>> What do you mean by burning down the underlying KPL? If KPL has a max
>> throughput, then the FlinkKinesisProducer should ideally respect that.
>>
>> nice ASCII art btw :-)
>>
>> Cheers,
>> Till
>>
>> On Fri, Aug 25, 2017 at 6:20 AM, Bowen Li <bo...@offerupnow.com>
>> wrote:
>>
>>> Hi Till,
>>>
>>> Thank you very much for looking into it! According to our investigation,
>>> this is indeed a Kinesis issue. Flink (FlinkKinesisProducer) uses
>>> KPL(Kinesis Producer Library), but hasn't tune it up yet. I have identified
>>> a bunch of issues, opened the following Flink tickets, and are working on
>>> them.
>>>
>>>
>>>    - [FLINK-7367][kinesis connector] Parameterize more configs for
>>>    FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections,
>>>    RequestTimeout, etc)
>>>    - [FLINK-7366][kinesis connector] Upgrade kinesis producer library
>>>    in flink-connector-kinesis
>>>    - [FLINK-7508] switch FlinkKinesisProducer to use KPL's
>>>    ThreadingMode to ThreadedPool mode rather than Per_Request mode
>>>
>>>
>>>     I do have a question for Flink performance. We are using a 1-day
>>> sized sliding window with 1-hour slide to count some features of items
>>> based on event time. We have about 20million items. We observed that Flink
>>> only emit results on a fixed time in an hour (e.g. 1am, 2am, 3am,  or
>>> 1:15am, 2:15am, 3:15am with a 15min offset). That's means 20million
>>> windows/records are generated at the same time every hour, which burns down
>>> FlinkKinesisProducer and the underlying KPL, but nothing is generated in
>>> the rest of that hour. The pattern is like this:
>>>
>>> load
>>> |
>>> |    /\                  /\
>>> |   /  \                /  \
>>> |_/_  \_______/__\_
>>>                                  time
>>>
>>>  Is there any way to even out the number of generated windows/records in
>>> an hour? Can we have evenly distributed generated load like this?
>>>
>>>  load
>>> |
>>> |
>>> | ------------------------
>>> |_______________
>>>                                  time
>>>
>>>
>>> Thanks,
>>> Bowen
>>>
>>> On Tue, Aug 22, 2017 at 2:56 AM, Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>>> Hi Bowen,
>>>>
>>>> sorry for my late answer. I dug through some of the logs and it seems
>>>> that you have the following problem:
>>>>
>>>>    1.
>>>>
>>>>    Once in a while the Kinesis producer fails with a
>>>>    UserRecordFailedException saying “Expired while waiting in HttpClient queue
>>>>    Record has reached expiration”. This seems to be a problem on the Kinesis
>>>>    side. This will trigger the task failure and the cancellation of all other
>>>>    tasks as well.
>>>>    2.
>>>>
>>>>    Somehow Flink does not manage to cancel all tasks within a period
>>>>    of 180 seconds. This value is configurable via
>>>>    task.cancellation.timeout (unit ms) via the Flink configuration. It
>>>>    looks a bit like you have a lot of logging going on, because the the code
>>>>    is waiting for example on Category.java:204 and other log4j methods. This
>>>>    could, however also cover the true issue. What you could do is to try out a
>>>>    different logging backend such as logback [1], for example.
>>>>    3.
>>>>
>>>>    The failing cancellation is a fatal error which leads to the
>>>>    termination of the TaskManager. This will be notified by the
>>>>    YarnResourceManager and it will restart the container. This goes on until
>>>>    it reaches the number of maximum failed containers. This value can be
>>>>    configured via yarn.maximum-failed-containers. Per default it is
>>>>    the number of initial containers you requested. If you set this value to
>>>>    -1, then it will never fail and always restart failed containers.
>>>>    Once the maximum is reached, Flink terminates the Yarn application.
>>>>
>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>> dev/best_practices.html#using-logback-instead-of-log4j
>>>>
>>>> In order to further debug the problem, which version of Flink are you
>>>> using and maybe you could provide us with the debug log level logs of the
>>>> TaskManagers.
>>>>
>>>> Cheers,
>>>> Till
>>>> ​
>>>>
>>>> On Fri, Aug 11, 2017 at 5:37 AM, Bowen Li <bo...@offerupnow.com>
>>>> wrote:
>>>>
>>>>> Hi Till,
>>>>>     Any idea why it happened? I've tried different configurations for
>>>>> configuring our Flink cluster, but the cluster always fails after 4 or 5
>>>>> hours.
>>>>>
>>>>>     According to the log, looks like the total number of slots becomes
>>>>> 0 at the end, and YarnClusterClient shuts down application master as
>>>>> a result. Why the slots are not released? Or are they actually
>>>>> crushed and thus no longer available?
>>>>>
>>>>> I'm trying to deploy the first Flink cluster within out company. And
>>>>> this issue is slowing us down from proving that Flink actually works for
>>>>> us. We'd appreciate your help on it!
>>>>>
>>>>> Thanks,
>>>>> Bowen
>>>>>
>>>>> On Wed, Aug 9, 2017 at 1:33 PM, Bowen Li <bo...@offerupnow.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Till,
>>>>>>     Thanks for taking this issue.
>>>>>>
>>>>>>     We are not comfortable sending logs to a email list which is this
>>>>>> open. I'll send logs to you.
>>>>>>
>>>>>> Thanks,
>>>>>> Bowen
>>>>>>
>>>>>>
>>>>>> On Wed, Aug 9, 2017 at 2:46 AM, Till Rohrmann <tr...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Bowen,
>>>>>>>
>>>>>>> if I'm not mistaken, then Flink's current Yarn implementation does
>>>>>>> not actively releases containers. The `YarnFlinkResourceManager` is started
>>>>>>> with a fixed number of containers it always tries to acquire. If a
>>>>>>> container should die, then it will request a new one.
>>>>>>>
>>>>>>> In case of a failure all slots should be freed and then they should
>>>>>>> be subject to rescheduling the new tasks. Thus, it is not necessarily the
>>>>>>> case that 12 new slots will be used unless the old slots are no longer
>>>>>>> available (failure of a TM). Therefore, it sounds like a bug what you are
>>>>>>> describing. Could you share the logs with us?
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Wed, Aug 9, 2017 at 9:32 AM, Bowen Li <bo...@offerupnow.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi guys,
>>>>>>>>     I was running a Flink job (12 parallelism) on an EMR cluster
>>>>>>>> with 48 YARN slots. When the job starts, I can see from Flink UI that the
>>>>>>>> job took 12 slots, and 36 slots were left available.
>>>>>>>>
>>>>>>>>     I would expect that when the job fails, it would restart from
>>>>>>>> checkpointing by taking another 12 slots and freeing the original 12 slots. *Well,
>>>>>>>> I observed that the job took new slots but never free original slots. The
>>>>>>>> Flink job ended up killed by YARN because there's no available slots
>>>>>>>> anymore.*
>>>>>>>>
>>>>>>>>      Here's the command I ran Flink job:
>>>>>>>>
>>>>>>>>      ```
>>>>>>>>      flink run -m yarn-cluster -yn 6 -ys 8 -ytm 40000  xxx.jar
>>>>>>>>      ```
>>>>>>>>
>>>>>>>>      Does anyone know what's going wrong?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Bowen
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>