You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Bowen Li <bo...@offerupnow.com> on 2017/08/09 07:32:34 UTC

Flink doesn't free YARN slots after restarting

Hi guys,
    I was running a Flink job (12 parallelism) on an EMR cluster with 48
YARN slots. When the job starts, I can see from Flink UI that the job took
12 slots, and 36 slots were left available.

    I would expect that when the job fails, it would restart from
checkpointing by taking another 12 slots and freeing the original 12
slots. *Well,
I observed that the job took new slots but never free original slots. The
Flink job ended up killed by YARN because there's no available slots
anymore.*

     Here's the command I ran Flink job:

     ```
     flink run -m yarn-cluster -yn 6 -ys 8 -ytm 40000  xxx.jar
     ```

     Does anyone know what's going wrong?

Thanks,
Bowen

Re: Flink doesn't free YARN slots after restarting

Posted by Till Rohrmann <tr...@apache.org>.
Sorry for my late answer Bowen,

I think this only works if you implement your own WindowAssigner. With the
built-in sliding window this is not possible since all windows have the
same offset.

Cheers,
Till

On Fri, Aug 25, 2017 at 9:44 AM, Bowen Li <bo...@offerupnow.com> wrote:

> Hi Till,
> What I mean is: can the sliding windows for different item have different
> start time?
>
> Here's an example of what we want:
> - for item A: its first event arrives at 2017/8/24-01:*12:24*, so the 1st
> window should be 2017/8/24-01:*12:24* - 2017/8/25-01:*12:23*, the 2nd
> window would be 2017/8/24-02:*12:24* - 2017/8/25-02:*12:23*, and so on
> - for item B: its first event arrives at 2017/8/24-01:*10:20*, so the 1st
> window should be 2017/8/24-01:*10:20* - 2017/8/25-01:*10:19*, the 2nd
> window would be 2017/8/24-02:*10:20* - 2017/8/25-02:*10:19*, and so on.
>
> But we observed that what Flink does is: for both A and B, their own
> unique time offset within an hour (*12:24 and 10:20*) are eliminated by
> Flink, and windows are unified to be like 2017/8/24-01:*00:00* -
> 2017/8/25-01:*00:00*, 2017/8/24-02:*00:00* - 2017/8/25-02:*00:00*, and so
> on.
>
> Unifying the starting time of windows for all items brings us trouble. It
> means 20million windows are triggered and fired at same time, and the
> downstream Kinesis sink cannot handle the amount of output. We actually
> want windows for different items to be triggered and fired at different
> time within an hour, so we can even out the amount of output to downstream
> Kinesis sink, as my ASCII charts demonstrated.
>
> Does my example make sense?
>
> Thanks,
> Bowen
>
> On Fri, Aug 25, 2017 at 12:01 AM, Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Bowen,
>>
>> having a sliding window of one day with a slide of one hour basically
>> means that each window is closed after 24 hours and the next closing
>> happens one hour later. Only when the window is closed/triggered, you
>> compute the window function which generates the window output. That's why
>> you see the spikes in your load and it's basically caused by the program
>> semantics.
>>
>> What do you mean by burning down the underlying KPL? If KPL has a max
>> throughput, then the FlinkKinesisProducer should ideally respect that.
>>
>> nice ASCII art btw :-)
>>
>> Cheers,
>> Till
>>
>> On Fri, Aug 25, 2017 at 6:20 AM, Bowen Li <bo...@offerupnow.com>
>> wrote:
>>
>>> Hi Till,
>>>
>>> Thank you very much for looking into it! According to our investigation,
>>> this is indeed a Kinesis issue. Flink (FlinkKinesisProducer) uses
>>> KPL(Kinesis Producer Library), but hasn't tune it up yet. I have identified
>>> a bunch of issues, opened the following Flink tickets, and are working on
>>> them.
>>>
>>>
>>>    - [FLINK-7367][kinesis connector] Parameterize more configs for
>>>    FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections,
>>>    RequestTimeout, etc)
>>>    - [FLINK-7366][kinesis connector] Upgrade kinesis producer library
>>>    in flink-connector-kinesis
>>>    - [FLINK-7508] switch FlinkKinesisProducer to use KPL's
>>>    ThreadingMode to ThreadedPool mode rather than Per_Request mode
>>>
>>>
>>>     I do have a question for Flink performance. We are using a 1-day
>>> sized sliding window with 1-hour slide to count some features of items
>>> based on event time. We have about 20million items. We observed that Flink
>>> only emit results on a fixed time in an hour (e.g. 1am, 2am, 3am,  or
>>> 1:15am, 2:15am, 3:15am with a 15min offset). That's means 20million
>>> windows/records are generated at the same time every hour, which burns down
>>> FlinkKinesisProducer and the underlying KPL, but nothing is generated in
>>> the rest of that hour. The pattern is like this:
>>>
>>> load
>>> |
>>> |    /\                  /\
>>> |   /  \                /  \
>>> |_/_  \_______/__\_
>>>                                  time
>>>
>>>  Is there any way to even out the number of generated windows/records in
>>> an hour? Can we have evenly distributed generated load like this?
>>>
>>>  load
>>> |
>>> |
>>> | ------------------------
>>> |_______________
>>>                                  time
>>>
>>>
>>> Thanks,
>>> Bowen
>>>
>>> On Tue, Aug 22, 2017 at 2:56 AM, Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>>> Hi Bowen,
>>>>
>>>> sorry for my late answer. I dug through some of the logs and it seems
>>>> that you have the following problem:
>>>>
>>>>    1.
>>>>
>>>>    Once in a while the Kinesis producer fails with a
>>>>    UserRecordFailedException saying “Expired while waiting in HttpClient queue
>>>>    Record has reached expiration”. This seems to be a problem on the Kinesis
>>>>    side. This will trigger the task failure and the cancellation of all other
>>>>    tasks as well.
>>>>    2.
>>>>
>>>>    Somehow Flink does not manage to cancel all tasks within a period
>>>>    of 180 seconds. This value is configurable via
>>>>    task.cancellation.timeout (unit ms) via the Flink configuration. It
>>>>    looks a bit like you have a lot of logging going on, because the the code
>>>>    is waiting for example on Category.java:204 and other log4j methods. This
>>>>    could, however also cover the true issue. What you could do is to try out a
>>>>    different logging backend such as logback [1], for example.
>>>>    3.
>>>>
>>>>    The failing cancellation is a fatal error which leads to the
>>>>    termination of the TaskManager. This will be notified by the
>>>>    YarnResourceManager and it will restart the container. This goes on until
>>>>    it reaches the number of maximum failed containers. This value can be
>>>>    configured via yarn.maximum-failed-containers. Per default it is
>>>>    the number of initial containers you requested. If you set this value to
>>>>    -1, then it will never fail and always restart failed containers.
>>>>    Once the maximum is reached, Flink terminates the Yarn application.
>>>>
>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>> dev/best_practices.html#using-logback-instead-of-log4j
>>>>
>>>> In order to further debug the problem, which version of Flink are you
>>>> using and maybe you could provide us with the debug log level logs of the
>>>> TaskManagers.
>>>>
>>>> Cheers,
>>>> Till
>>>> ​
>>>>
>>>> On Fri, Aug 11, 2017 at 5:37 AM, Bowen Li <bo...@offerupnow.com>
>>>> wrote:
>>>>
>>>>> Hi Till,
>>>>>     Any idea why it happened? I've tried different configurations for
>>>>> configuring our Flink cluster, but the cluster always fails after 4 or 5
>>>>> hours.
>>>>>
>>>>>     According to the log, looks like the total number of slots becomes
>>>>> 0 at the end, and YarnClusterClient shuts down application master as
>>>>> a result. Why the slots are not released? Or are they actually
>>>>> crushed and thus no longer available?
>>>>>
>>>>> I'm trying to deploy the first Flink cluster within out company. And
>>>>> this issue is slowing us down from proving that Flink actually works for
>>>>> us. We'd appreciate your help on it!
>>>>>
>>>>> Thanks,
>>>>> Bowen
>>>>>
>>>>> On Wed, Aug 9, 2017 at 1:33 PM, Bowen Li <bo...@offerupnow.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Till,
>>>>>>     Thanks for taking this issue.
>>>>>>
>>>>>>     We are not comfortable sending logs to a email list which is this
>>>>>> open. I'll send logs to you.
>>>>>>
>>>>>> Thanks,
>>>>>> Bowen
>>>>>>
>>>>>>
>>>>>> On Wed, Aug 9, 2017 at 2:46 AM, Till Rohrmann <tr...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Bowen,
>>>>>>>
>>>>>>> if I'm not mistaken, then Flink's current Yarn implementation does
>>>>>>> not actively releases containers. The `YarnFlinkResourceManager` is started
>>>>>>> with a fixed number of containers it always tries to acquire. If a
>>>>>>> container should die, then it will request a new one.
>>>>>>>
>>>>>>> In case of a failure all slots should be freed and then they should
>>>>>>> be subject to rescheduling the new tasks. Thus, it is not necessarily the
>>>>>>> case that 12 new slots will be used unless the old slots are no longer
>>>>>>> available (failure of a TM). Therefore, it sounds like a bug what you are
>>>>>>> describing. Could you share the logs with us?
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Wed, Aug 9, 2017 at 9:32 AM, Bowen Li <bo...@offerupnow.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi guys,
>>>>>>>>     I was running a Flink job (12 parallelism) on an EMR cluster
>>>>>>>> with 48 YARN slots. When the job starts, I can see from Flink UI that the
>>>>>>>> job took 12 slots, and 36 slots were left available.
>>>>>>>>
>>>>>>>>     I would expect that when the job fails, it would restart from
>>>>>>>> checkpointing by taking another 12 slots and freeing the original 12 slots. *Well,
>>>>>>>> I observed that the job took new slots but never free original slots. The
>>>>>>>> Flink job ended up killed by YARN because there's no available slots
>>>>>>>> anymore.*
>>>>>>>>
>>>>>>>>      Here's the command I ran Flink job:
>>>>>>>>
>>>>>>>>      ```
>>>>>>>>      flink run -m yarn-cluster -yn 6 -ys 8 -ytm 40000  xxx.jar
>>>>>>>>      ```
>>>>>>>>
>>>>>>>>      Does anyone know what's going wrong?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Bowen
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Flink doesn't free YARN slots after restarting

Posted by Bowen Li <bo...@offerupnow.com>.
Hi Till,
What I mean is: can the sliding windows for different item have different
start time?

Here's an example of what we want:
- for item A: its first event arrives at 2017/8/24-01:*12:24*, so the 1st
window should be 2017/8/24-01:*12:24* - 2017/8/25-01:*12:23*, the 2nd
window would be 2017/8/24-02:*12:24* - 2017/8/25-02:*12:23*, and so on
- for item B: its first event arrives at 2017/8/24-01:*10:20*, so the 1st
window should be 2017/8/24-01:*10:20* - 2017/8/25-01:*10:19*, the 2nd
window would be 2017/8/24-02:*10:20* - 2017/8/25-02:*10:19*, and so on.

But we observed that what Flink does is: for both A and B, their own unique
time offset within an hour (*12:24 and 10:20*) are eliminated by Flink, and
windows are unified to be like 2017/8/24-01:*00:00* - 2017/8/25-01:*00:00*,
2017/8/24-02:*00:00* - 2017/8/25-02:*00:00*, and so on.

Unifying the starting time of windows for all items brings us trouble. It
means 20million windows are triggered and fired at same time, and the
downstream Kinesis sink cannot handle the amount of output. We actually
want windows for different items to be triggered and fired at different
time within an hour, so we can even out the amount of output to downstream
Kinesis sink, as my ASCII charts demonstrated.

Does my example make sense?

Thanks,
Bowen

On Fri, Aug 25, 2017 at 12:01 AM, Till Rohrmann <tr...@apache.org>
wrote:

> Hi Bowen,
>
> having a sliding window of one day with a slide of one hour basically
> means that each window is closed after 24 hours and the next closing
> happens one hour later. Only when the window is closed/triggered, you
> compute the window function which generates the window output. That's why
> you see the spikes in your load and it's basically caused by the program
> semantics.
>
> What do you mean by burning down the underlying KPL? If KPL has a max
> throughput, then the FlinkKinesisProducer should ideally respect that.
>
> nice ASCII art btw :-)
>
> Cheers,
> Till
>
> On Fri, Aug 25, 2017 at 6:20 AM, Bowen Li <bo...@offerupnow.com> wrote:
>
>> Hi Till,
>>
>> Thank you very much for looking into it! According to our investigation,
>> this is indeed a Kinesis issue. Flink (FlinkKinesisProducer) uses
>> KPL(Kinesis Producer Library), but hasn't tune it up yet. I have identified
>> a bunch of issues, opened the following Flink tickets, and are working on
>> them.
>>
>>
>>    - [FLINK-7367][kinesis connector] Parameterize more configs for
>>    FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections,
>>    RequestTimeout, etc)
>>    - [FLINK-7366][kinesis connector] Upgrade kinesis producer library in
>>    flink-connector-kinesis
>>    - [FLINK-7508] switch FlinkKinesisProducer to use KPL's ThreadingMode
>>    to ThreadedPool mode rather than Per_Request mode
>>
>>
>>     I do have a question for Flink performance. We are using a 1-day
>> sized sliding window with 1-hour slide to count some features of items
>> based on event time. We have about 20million items. We observed that Flink
>> only emit results on a fixed time in an hour (e.g. 1am, 2am, 3am,  or
>> 1:15am, 2:15am, 3:15am with a 15min offset). That's means 20million
>> windows/records are generated at the same time every hour, which burns down
>> FlinkKinesisProducer and the underlying KPL, but nothing is generated in
>> the rest of that hour. The pattern is like this:
>>
>> load
>> |
>> |    /\                  /\
>> |   /  \                /  \
>> |_/_  \_______/__\_
>>                                  time
>>
>>  Is there any way to even out the number of generated windows/records in
>> an hour? Can we have evenly distributed generated load like this?
>>
>>  load
>> |
>> |
>> | ------------------------
>> |_______________
>>                                  time
>>
>>
>> Thanks,
>> Bowen
>>
>> On Tue, Aug 22, 2017 at 2:56 AM, Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> Hi Bowen,
>>>
>>> sorry for my late answer. I dug through some of the logs and it seems
>>> that you have the following problem:
>>>
>>>    1.
>>>
>>>    Once in a while the Kinesis producer fails with a
>>>    UserRecordFailedException saying “Expired while waiting in HttpClient queue
>>>    Record has reached expiration”. This seems to be a problem on the Kinesis
>>>    side. This will trigger the task failure and the cancellation of all other
>>>    tasks as well.
>>>    2.
>>>
>>>    Somehow Flink does not manage to cancel all tasks within a period of
>>>    180 seconds. This value is configurable via task.cancellation.timeout
>>>    (unit ms) via the Flink configuration. It looks a bit like you have a lot
>>>    of logging going on, because the the code is waiting for example on
>>>    Category.java:204 and other log4j methods. This could, however also cover
>>>    the true issue. What you could do is to try out a different logging backend
>>>    such as logback [1], for example.
>>>    3.
>>>
>>>    The failing cancellation is a fatal error which leads to the
>>>    termination of the TaskManager. This will be notified by the
>>>    YarnResourceManager and it will restart the container. This goes on until
>>>    it reaches the number of maximum failed containers. This value can be
>>>    configured via yarn.maximum-failed-containers. Per default it is the
>>>    number of initial containers you requested. If you set this value to
>>>    -1, then it will never fail and always restart failed containers.
>>>    Once the maximum is reached, Flink terminates the Yarn application.
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>> dev/best_practices.html#using-logback-instead-of-log4j
>>>
>>> In order to further debug the problem, which version of Flink are you
>>> using and maybe you could provide us with the debug log level logs of the
>>> TaskManagers.
>>>
>>> Cheers,
>>> Till
>>> ​
>>>
>>> On Fri, Aug 11, 2017 at 5:37 AM, Bowen Li <bo...@offerupnow.com>
>>> wrote:
>>>
>>>> Hi Till,
>>>>     Any idea why it happened? I've tried different configurations for
>>>> configuring our Flink cluster, but the cluster always fails after 4 or 5
>>>> hours.
>>>>
>>>>     According to the log, looks like the total number of slots becomes
>>>> 0 at the end, and YarnClusterClient shuts down application master as a
>>>> result. Why the slots are not released? Or are they actually crushed
>>>> and thus no longer available?
>>>>
>>>> I'm trying to deploy the first Flink cluster within out company. And
>>>> this issue is slowing us down from proving that Flink actually works for
>>>> us. We'd appreciate your help on it!
>>>>
>>>> Thanks,
>>>> Bowen
>>>>
>>>> On Wed, Aug 9, 2017 at 1:33 PM, Bowen Li <bo...@offerupnow.com>
>>>> wrote:
>>>>
>>>>> Hi Till,
>>>>>     Thanks for taking this issue.
>>>>>
>>>>>     We are not comfortable sending logs to a email list which is this
>>>>> open. I'll send logs to you.
>>>>>
>>>>> Thanks,
>>>>> Bowen
>>>>>
>>>>>
>>>>> On Wed, Aug 9, 2017 at 2:46 AM, Till Rohrmann <tr...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Bowen,
>>>>>>
>>>>>> if I'm not mistaken, then Flink's current Yarn implementation does
>>>>>> not actively releases containers. The `YarnFlinkResourceManager` is started
>>>>>> with a fixed number of containers it always tries to acquire. If a
>>>>>> container should die, then it will request a new one.
>>>>>>
>>>>>> In case of a failure all slots should be freed and then they should
>>>>>> be subject to rescheduling the new tasks. Thus, it is not necessarily the
>>>>>> case that 12 new slots will be used unless the old slots are no longer
>>>>>> available (failure of a TM). Therefore, it sounds like a bug what you are
>>>>>> describing. Could you share the logs with us?
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Wed, Aug 9, 2017 at 9:32 AM, Bowen Li <bo...@offerupnow.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi guys,
>>>>>>>     I was running a Flink job (12 parallelism) on an EMR cluster
>>>>>>> with 48 YARN slots. When the job starts, I can see from Flink UI that the
>>>>>>> job took 12 slots, and 36 slots were left available.
>>>>>>>
>>>>>>>     I would expect that when the job fails, it would restart from
>>>>>>> checkpointing by taking another 12 slots and freeing the original 12 slots. *Well,
>>>>>>> I observed that the job took new slots but never free original slots. The
>>>>>>> Flink job ended up killed by YARN because there's no available slots
>>>>>>> anymore.*
>>>>>>>
>>>>>>>      Here's the command I ran Flink job:
>>>>>>>
>>>>>>>      ```
>>>>>>>      flink run -m yarn-cluster -yn 6 -ys 8 -ytm 40000  xxx.jar
>>>>>>>      ```
>>>>>>>
>>>>>>>      Does anyone know what's going wrong?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Bowen
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Flink doesn't free YARN slots after restarting

Posted by Till Rohrmann <tr...@apache.org>.
Hi Bowen,

having a sliding window of one day with a slide of one hour basically means
that each window is closed after 24 hours and the next closing happens one
hour later. Only when the window is closed/triggered, you compute the
window function which generates the window output. That's why you see the
spikes in your load and it's basically caused by the program semantics.

What do you mean by burning down the underlying KPL? If KPL has a max
throughput, then the FlinkKinesisProducer should ideally respect that.

nice ASCII art btw :-)

Cheers,
Till

On Fri, Aug 25, 2017 at 6:20 AM, Bowen Li <bo...@offerupnow.com> wrote:

> Hi Till,
>
> Thank you very much for looking into it! According to our investigation,
> this is indeed a Kinesis issue. Flink (FlinkKinesisProducer) uses
> KPL(Kinesis Producer Library), but hasn't tune it up yet. I have identified
> a bunch of issues, opened the following Flink tickets, and are working on
> them.
>
>
>    - [FLINK-7367][kinesis connector] Parameterize more configs for
>    FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections,
>    RequestTimeout, etc)
>    - [FLINK-7366][kinesis connector] Upgrade kinesis producer library in
>    flink-connector-kinesis
>    - [FLINK-7508] switch FlinkKinesisProducer to use KPL's ThreadingMode
>    to ThreadedPool mode rather than Per_Request mode
>
>
>     I do have a question for Flink performance. We are using a 1-day sized
> sliding window with 1-hour slide to count some features of items based on
> event time. We have about 20million items. We observed that Flink only emit
> results on a fixed time in an hour (e.g. 1am, 2am, 3am,  or 1:15am, 2:15am,
> 3:15am with a 15min offset). That's means 20million windows/records are
> generated at the same time every hour, which burns down
> FlinkKinesisProducer and the underlying KPL, but nothing is generated in
> the rest of that hour. The pattern is like this:
>
> load
> |
> |    /\                  /\
> |   /  \                /  \
> |_/_  \_______/__\_
>                                  time
>
>  Is there any way to even out the number of generated windows/records in
> an hour? Can we have evenly distributed generated load like this?
>
>  load
> |
> |
> | ------------------------
> |_______________
>                                  time
>
>
> Thanks,
> Bowen
>
> On Tue, Aug 22, 2017 at 2:56 AM, Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Bowen,
>>
>> sorry for my late answer. I dug through some of the logs and it seems
>> that you have the following problem:
>>
>>    1.
>>
>>    Once in a while the Kinesis producer fails with a
>>    UserRecordFailedException saying “Expired while waiting in HttpClient queue
>>    Record has reached expiration”. This seems to be a problem on the Kinesis
>>    side. This will trigger the task failure and the cancellation of all other
>>    tasks as well.
>>    2.
>>
>>    Somehow Flink does not manage to cancel all tasks within a period of
>>    180 seconds. This value is configurable via task.cancellation.timeout
>>    (unit ms) via the Flink configuration. It looks a bit like you have a lot
>>    of logging going on, because the the code is waiting for example on
>>    Category.java:204 and other log4j methods. This could, however also cover
>>    the true issue. What you could do is to try out a different logging backend
>>    such as logback [1], for example.
>>    3.
>>
>>    The failing cancellation is a fatal error which leads to the
>>    termination of the TaskManager. This will be notified by the
>>    YarnResourceManager and it will restart the container. This goes on until
>>    it reaches the number of maximum failed containers. This value can be
>>    configured via yarn.maximum-failed-containers. Per default it is the
>>    number of initial containers you requested. If you set this value to
>>    -1, then it will never fail and always restart failed containers.
>>    Once the maximum is reached, Flink terminates the Yarn application.
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> dev/best_practices.html#using-logback-instead-of-log4j
>>
>> In order to further debug the problem, which version of Flink are you
>> using and maybe you could provide us with the debug log level logs of the
>> TaskManagers.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Fri, Aug 11, 2017 at 5:37 AM, Bowen Li <bo...@offerupnow.com>
>> wrote:
>>
>>> Hi Till,
>>>     Any idea why it happened? I've tried different configurations for
>>> configuring our Flink cluster, but the cluster always fails after 4 or 5
>>> hours.
>>>
>>>     According to the log, looks like the total number of slots becomes 0
>>> at the end, and YarnClusterClient shuts down application master as a
>>> result. Why the slots are not released? Or are they actually crushed
>>> and thus no longer available?
>>>
>>> I'm trying to deploy the first Flink cluster within out company. And
>>> this issue is slowing us down from proving that Flink actually works for
>>> us. We'd appreciate your help on it!
>>>
>>> Thanks,
>>> Bowen
>>>
>>> On Wed, Aug 9, 2017 at 1:33 PM, Bowen Li <bo...@offerupnow.com>
>>> wrote:
>>>
>>>> Hi Till,
>>>>     Thanks for taking this issue.
>>>>
>>>>     We are not comfortable sending logs to a email list which is this
>>>> open. I'll send logs to you.
>>>>
>>>> Thanks,
>>>> Bowen
>>>>
>>>>
>>>> On Wed, Aug 9, 2017 at 2:46 AM, Till Rohrmann <tr...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Bowen,
>>>>>
>>>>> if I'm not mistaken, then Flink's current Yarn implementation does not
>>>>> actively releases containers. The `YarnFlinkResourceManager` is started
>>>>> with a fixed number of containers it always tries to acquire. If a
>>>>> container should die, then it will request a new one.
>>>>>
>>>>> In case of a failure all slots should be freed and then they should be
>>>>> subject to rescheduling the new tasks. Thus, it is not necessarily the case
>>>>> that 12 new slots will be used unless the old slots are no longer available
>>>>> (failure of a TM). Therefore, it sounds like a bug what you are describing.
>>>>> Could you share the logs with us?
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Wed, Aug 9, 2017 at 9:32 AM, Bowen Li <bo...@offerupnow.com>
>>>>> wrote:
>>>>>
>>>>>> Hi guys,
>>>>>>     I was running a Flink job (12 parallelism) on an EMR cluster with
>>>>>> 48 YARN slots. When the job starts, I can see from Flink UI that the job
>>>>>> took 12 slots, and 36 slots were left available.
>>>>>>
>>>>>>     I would expect that when the job fails, it would restart from
>>>>>> checkpointing by taking another 12 slots and freeing the original 12 slots. *Well,
>>>>>> I observed that the job took new slots but never free original slots. The
>>>>>> Flink job ended up killed by YARN because there's no available slots
>>>>>> anymore.*
>>>>>>
>>>>>>      Here's the command I ran Flink job:
>>>>>>
>>>>>>      ```
>>>>>>      flink run -m yarn-cluster -yn 6 -ys 8 -ytm 40000  xxx.jar
>>>>>>      ```
>>>>>>
>>>>>>      Does anyone know what's going wrong?
>>>>>>
>>>>>> Thanks,
>>>>>> Bowen
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Flink doesn't free YARN slots after restarting

Posted by Bowen Li <bo...@offerupnow.com>.
Hi Till,

Thank you very much for looking into it! According to our investigation,
this is indeed a Kinesis issue. Flink (FlinkKinesisProducer) uses
KPL(Kinesis Producer Library), but hasn't tune it up yet. I have identified
a bunch of issues, opened the following Flink tickets, and are working on
them.


   - [FLINK-7367][kinesis connector] Parameterize more configs for
   FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections,
   RequestTimeout, etc)
   - [FLINK-7366][kinesis connector] Upgrade kinesis producer library in
   flink-connector-kinesis
   - [FLINK-7508] switch FlinkKinesisProducer to use KPL's ThreadingMode to
   ThreadedPool mode rather than Per_Request mode


    I do have a question for Flink performance. We are using a 1-day sized
sliding window with 1-hour slide to count some features of items based on
event time. We have about 20million items. We observed that Flink only emit
results on a fixed time in an hour (e.g. 1am, 2am, 3am,  or 1:15am, 2:15am,
3:15am with a 15min offset). That's means 20million windows/records are
generated at the same time every hour, which burns down
FlinkKinesisProducer and the underlying KPL, but nothing is generated in
the rest of that hour. The pattern is like this:

load
|
|    /\                  /\
|   /  \                /  \
|_/_  \_______/__\_
                                 time

 Is there any way to even out the number of generated windows/records in an
hour? Can we have evenly distributed generated load like this?

 load
|
|
| ------------------------
|_______________
                                 time


Thanks,
Bowen

On Tue, Aug 22, 2017 at 2:56 AM, Till Rohrmann <tr...@apache.org> wrote:

> Hi Bowen,
>
> sorry for my late answer. I dug through some of the logs and it seems that
> you have the following problem:
>
>    1.
>
>    Once in a while the Kinesis producer fails with a
>    UserRecordFailedException saying “Expired while waiting in HttpClient queue
>    Record has reached expiration”. This seems to be a problem on the Kinesis
>    side. This will trigger the task failure and the cancellation of all other
>    tasks as well.
>    2.
>
>    Somehow Flink does not manage to cancel all tasks within a period of
>    180 seconds. This value is configurable via task.cancellation.timeout
>    (unit ms) via the Flink configuration. It looks a bit like you have a lot
>    of logging going on, because the the code is waiting for example on
>    Category.java:204 and other log4j methods. This could, however also cover
>    the true issue. What you could do is to try out a different logging backend
>    such as logback [1], for example.
>    3.
>
>    The failing cancellation is a fatal error which leads to the
>    termination of the TaskManager. This will be notified by the
>    YarnResourceManager and it will restart the container. This goes on until
>    it reaches the number of maximum failed containers. This value can be
>    configured via yarn.maximum-failed-containers. Per default it is the
>    number of initial containers you requested. If you set this value to -1,
>    then it will never fail and always restart failed containers. Once the
>    maximum is reached, Flink terminates the Yarn application.
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_
> practices.html#using-logback-instead-of-log4j
>
> In order to further debug the problem, which version of Flink are you
> using and maybe you could provide us with the debug log level logs of the
> TaskManagers.
>
> Cheers,
> Till
> ​
>
> On Fri, Aug 11, 2017 at 5:37 AM, Bowen Li <bo...@offerupnow.com> wrote:
>
>> Hi Till,
>>     Any idea why it happened? I've tried different configurations for
>> configuring our Flink cluster, but the cluster always fails after 4 or 5
>> hours.
>>
>>     According to the log, looks like the total number of slots becomes 0
>> at the end, and YarnClusterClient shuts down application master as a
>> result. Why the slots are not released? Or are they actually crushed and
>> thus no longer available?
>>
>> I'm trying to deploy the first Flink cluster within out company. And this
>> issue is slowing us down from proving that Flink actually works for us.
>> We'd appreciate your help on it!
>>
>> Thanks,
>> Bowen
>>
>> On Wed, Aug 9, 2017 at 1:33 PM, Bowen Li <bo...@offerupnow.com> wrote:
>>
>>> Hi Till,
>>>     Thanks for taking this issue.
>>>
>>>     We are not comfortable sending logs to a email list which is this
>>> open. I'll send logs to you.
>>>
>>> Thanks,
>>> Bowen
>>>
>>>
>>> On Wed, Aug 9, 2017 at 2:46 AM, Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>>> Hi Bowen,
>>>>
>>>> if I'm not mistaken, then Flink's current Yarn implementation does not
>>>> actively releases containers. The `YarnFlinkResourceManager` is started
>>>> with a fixed number of containers it always tries to acquire. If a
>>>> container should die, then it will request a new one.
>>>>
>>>> In case of a failure all slots should be freed and then they should be
>>>> subject to rescheduling the new tasks. Thus, it is not necessarily the case
>>>> that 12 new slots will be used unless the old slots are no longer available
>>>> (failure of a TM). Therefore, it sounds like a bug what you are describing.
>>>> Could you share the logs with us?
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Wed, Aug 9, 2017 at 9:32 AM, Bowen Li <bo...@offerupnow.com>
>>>> wrote:
>>>>
>>>>> Hi guys,
>>>>>     I was running a Flink job (12 parallelism) on an EMR cluster with
>>>>> 48 YARN slots. When the job starts, I can see from Flink UI that the job
>>>>> took 12 slots, and 36 slots were left available.
>>>>>
>>>>>     I would expect that when the job fails, it would restart from
>>>>> checkpointing by taking another 12 slots and freeing the original 12 slots. *Well,
>>>>> I observed that the job took new slots but never free original slots. The
>>>>> Flink job ended up killed by YARN because there's no available slots
>>>>> anymore.*
>>>>>
>>>>>      Here's the command I ran Flink job:
>>>>>
>>>>>      ```
>>>>>      flink run -m yarn-cluster -yn 6 -ys 8 -ytm 40000  xxx.jar
>>>>>      ```
>>>>>
>>>>>      Does anyone know what's going wrong?
>>>>>
>>>>> Thanks,
>>>>> Bowen
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Flink doesn't free YARN slots after restarting

Posted by Till Rohrmann <tr...@apache.org>.
Hi Bowen,

sorry for my late answer. I dug through some of the logs and it seems that
you have the following problem:

   1.

   Once in a while the Kinesis producer fails with a
   UserRecordFailedException saying “Expired while waiting in HttpClient queue
   Record has reached expiration”. This seems to be a problem on the Kinesis
   side. This will trigger the task failure and the cancellation of all other
   tasks as well.
   2.

   Somehow Flink does not manage to cancel all tasks within a period of 180
   seconds. This value is configurable via task.cancellation.timeout (unit
   ms) via the Flink configuration. It looks a bit like you have a lot of
   logging going on, because the the code is waiting for example on
   Category.java:204 and other log4j methods. This could, however also cover
   the true issue. What you could do is to try out a different logging backend
   such as logback [1], for example.
   3.

   The failing cancellation is a fatal error which leads to the termination
   of the TaskManager. This will be notified by the YarnResourceManager and it
   will restart the container. This goes on until it reaches the number of
   maximum failed containers. This value can be configured via
   yarn.maximum-failed-containers. Per default it is the number of initial
   containers you requested. If you set this value to -1, then it will
   never fail and always restart failed containers. Once the maximum is
   reached, Flink terminates the Yarn application.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html#using-logback-instead-of-log4j

In order to further debug the problem, which version of Flink are you using
and maybe you could provide us with the debug log level logs of the
TaskManagers.

Cheers,
Till
​

On Fri, Aug 11, 2017 at 5:37 AM, Bowen Li <bo...@offerupnow.com> wrote:

> Hi Till,
>     Any idea why it happened? I've tried different configurations for
> configuring our Flink cluster, but the cluster always fails after 4 or 5
> hours.
>
>     According to the log, looks like the total number of slots becomes 0
> at the end, and YarnClusterClient shuts down application master as a
> result. Why the slots are not released? Or are they actually crushed and
> thus no longer available?
>
> I'm trying to deploy the first Flink cluster within out company. And this
> issue is slowing us down from proving that Flink actually works for us.
> We'd appreciate your help on it!
>
> Thanks,
> Bowen
>
> On Wed, Aug 9, 2017 at 1:33 PM, Bowen Li <bo...@offerupnow.com> wrote:
>
>> Hi Till,
>>     Thanks for taking this issue.
>>
>>     We are not comfortable sending logs to a email list which is this
>> open. I'll send logs to you.
>>
>> Thanks,
>> Bowen
>>
>>
>> On Wed, Aug 9, 2017 at 2:46 AM, Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> Hi Bowen,
>>>
>>> if I'm not mistaken, then Flink's current Yarn implementation does not
>>> actively releases containers. The `YarnFlinkResourceManager` is started
>>> with a fixed number of containers it always tries to acquire. If a
>>> container should die, then it will request a new one.
>>>
>>> In case of a failure all slots should be freed and then they should be
>>> subject to rescheduling the new tasks. Thus, it is not necessarily the case
>>> that 12 new slots will be used unless the old slots are no longer available
>>> (failure of a TM). Therefore, it sounds like a bug what you are describing.
>>> Could you share the logs with us?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Aug 9, 2017 at 9:32 AM, Bowen Li <bo...@offerupnow.com>
>>> wrote:
>>>
>>>> Hi guys,
>>>>     I was running a Flink job (12 parallelism) on an EMR cluster with
>>>> 48 YARN slots. When the job starts, I can see from Flink UI that the job
>>>> took 12 slots, and 36 slots were left available.
>>>>
>>>>     I would expect that when the job fails, it would restart from
>>>> checkpointing by taking another 12 slots and freeing the original 12 slots. *Well,
>>>> I observed that the job took new slots but never free original slots. The
>>>> Flink job ended up killed by YARN because there's no available slots
>>>> anymore.*
>>>>
>>>>      Here's the command I ran Flink job:
>>>>
>>>>      ```
>>>>      flink run -m yarn-cluster -yn 6 -ys 8 -ytm 40000  xxx.jar
>>>>      ```
>>>>
>>>>      Does anyone know what's going wrong?
>>>>
>>>> Thanks,
>>>> Bowen
>>>>
>>>
>>>
>>
>

Re: Flink doesn't free YARN slots after restarting

Posted by Bowen Li <bo...@offerupnow.com>.
Hi Till,
    Any idea why it happened? I've tried different configurations for
configuring our Flink cluster, but the cluster always fails after 4 or 5
hours.

    According to the log, looks like the total number of slots becomes 0 at
the end, and YarnClusterClient shuts down application master as a result. Why
the slots are not released? Or are they actually crushed and thus no longer
available?

I'm trying to deploy the first Flink cluster within out company. And this
issue is slowing us down from proving that Flink actually works for us.
We'd appreciate your help on it!

Thanks,
Bowen

On Wed, Aug 9, 2017 at 1:33 PM, Bowen Li <bo...@offerupnow.com> wrote:

> Hi Till,
>     Thanks for taking this issue.
>
>     We are not comfortable sending logs to a email list which is this
> open. I'll send logs to you.
>
> Thanks,
> Bowen
>
>
> On Wed, Aug 9, 2017 at 2:46 AM, Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Bowen,
>>
>> if I'm not mistaken, then Flink's current Yarn implementation does not
>> actively releases containers. The `YarnFlinkResourceManager` is started
>> with a fixed number of containers it always tries to acquire. If a
>> container should die, then it will request a new one.
>>
>> In case of a failure all slots should be freed and then they should be
>> subject to rescheduling the new tasks. Thus, it is not necessarily the case
>> that 12 new slots will be used unless the old slots are no longer available
>> (failure of a TM). Therefore, it sounds like a bug what you are describing.
>> Could you share the logs with us?
>>
>> Cheers,
>> Till
>>
>> On Wed, Aug 9, 2017 at 9:32 AM, Bowen Li <bo...@offerupnow.com> wrote:
>>
>>> Hi guys,
>>>     I was running a Flink job (12 parallelism) on an EMR cluster with 48
>>> YARN slots. When the job starts, I can see from Flink UI that the job took
>>> 12 slots, and 36 slots were left available.
>>>
>>>     I would expect that when the job fails, it would restart from
>>> checkpointing by taking another 12 slots and freeing the original 12 slots. *Well,
>>> I observed that the job took new slots but never free original slots. The
>>> Flink job ended up killed by YARN because there's no available slots
>>> anymore.*
>>>
>>>      Here's the command I ran Flink job:
>>>
>>>      ```
>>>      flink run -m yarn-cluster -yn 6 -ys 8 -ytm 40000  xxx.jar
>>>      ```
>>>
>>>      Does anyone know what's going wrong?
>>>
>>> Thanks,
>>> Bowen
>>>
>>
>>
>

Re: Flink doesn't free YARN slots after restarting

Posted by Bowen Li <bo...@offerupnow.com>.
Hi Till,
    Thanks for taking this issue.

    We are not comfortable sending logs to a email list which is this open.
I'll send logs to you.

Thanks,
Bowen


On Wed, Aug 9, 2017 at 2:46 AM, Till Rohrmann <tr...@apache.org> wrote:

> Hi Bowen,
>
> if I'm not mistaken, then Flink's current Yarn implementation does not
> actively releases containers. The `YarnFlinkResourceManager` is started
> with a fixed number of containers it always tries to acquire. If a
> container should die, then it will request a new one.
>
> In case of a failure all slots should be freed and then they should be
> subject to rescheduling the new tasks. Thus, it is not necessarily the case
> that 12 new slots will be used unless the old slots are no longer available
> (failure of a TM). Therefore, it sounds like a bug what you are describing.
> Could you share the logs with us?
>
> Cheers,
> Till
>
> On Wed, Aug 9, 2017 at 9:32 AM, Bowen Li <bo...@offerupnow.com> wrote:
>
>> Hi guys,
>>     I was running a Flink job (12 parallelism) on an EMR cluster with 48
>> YARN slots. When the job starts, I can see from Flink UI that the job took
>> 12 slots, and 36 slots were left available.
>>
>>     I would expect that when the job fails, it would restart from
>> checkpointing by taking another 12 slots and freeing the original 12 slots. *Well,
>> I observed that the job took new slots but never free original slots. The
>> Flink job ended up killed by YARN because there's no available slots
>> anymore.*
>>
>>      Here's the command I ran Flink job:
>>
>>      ```
>>      flink run -m yarn-cluster -yn 6 -ys 8 -ytm 40000  xxx.jar
>>      ```
>>
>>      Does anyone know what's going wrong?
>>
>> Thanks,
>> Bowen
>>
>
>

Re: Flink doesn't free YARN slots after restarting

Posted by Till Rohrmann <tr...@apache.org>.
Hi Bowen,

if I'm not mistaken, then Flink's current Yarn implementation does not
actively releases containers. The `YarnFlinkResourceManager` is started
with a fixed number of containers it always tries to acquire. If a
container should die, then it will request a new one.

In case of a failure all slots should be freed and then they should be
subject to rescheduling the new tasks. Thus, it is not necessarily the case
that 12 new slots will be used unless the old slots are no longer available
(failure of a TM). Therefore, it sounds like a bug what you are describing.
Could you share the logs with us?

Cheers,
Till

On Wed, Aug 9, 2017 at 9:32 AM, Bowen Li <bo...@offerupnow.com> wrote:

> Hi guys,
>     I was running a Flink job (12 parallelism) on an EMR cluster with 48
> YARN slots. When the job starts, I can see from Flink UI that the job took
> 12 slots, and 36 slots were left available.
>
>     I would expect that when the job fails, it would restart from
> checkpointing by taking another 12 slots and freeing the original 12 slots. *Well,
> I observed that the job took new slots but never free original slots. The
> Flink job ended up killed by YARN because there's no available slots
> anymore.*
>
>      Here's the command I ran Flink job:
>
>      ```
>      flink run -m yarn-cluster -yn 6 -ys 8 -ytm 40000  xxx.jar
>      ```
>
>      Does anyone know what's going wrong?
>
> Thanks,
> Bowen
>