You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vishal Santoshi <vi...@gmail.com> on 2018/01/21 20:16:14 UTC

AKA and quarantine

There have been a couple of instances where one of our TMs was quarantined
( the cause is irrelevant to this discussion ).  And we had to bounce the
TM to bring back sanity to the cluster.  There have been discussions around
and am trying to distill them. My questions are


*  Based on https://issues.apache.org/jira/browse/FLINK-3347 is it
advisable to set the taskmanager.exit-on-fatal-akka-error  to true. ?

* Is the akka.ask.timeout relevant here ? We could increase the value to
greater than 10s but based on your experiences is it more of a  "mask the
issue" exercise or is 10s generally a low value that *should* be increased ?

* Is it possible or is there some effort being put into per job
memory/resource consumption for a multi job setup that is very normal with
flink ?

* Is there an effort to monitor ROCKSDB useage ( off heap and what not ) ?
It seems a black box to a user as of today.

Thank you and regards.

Re: AKA and quarantine

Posted by Till Rohrmann <tr...@apache.org>.
If you don't run Flink in standalone mode, then you can activate
taskmanager.exit-on-fatal-akka-error. However, keep in mind that at some
point you might run out of spare TMs to run your jobs unless you restart
them manually.

Cheers,
Till

On Mon, Jan 29, 2018 at 6:41 PM, Vishal Santoshi <vi...@gmail.com>
wrote:

> >> If you enable taskmanager.exit-on-fatal-akka-error, then it will stop
> TMs which got quarantined. This will automatically restart TMs in case that
> you are running Flink on Yarn. Thus, I would recommend enabling this if
> possible
>
> We do not use yarn. This would end up restarting the jobs on the remaining
> TMs ( if retry is configured ) may be OK if we have enough resources ?
>
> >>The akka.ask.timeout parameter controls the timeout for remote
> messages. You should only increase this if you observe timeouts between the
> different Flink components. What you can change in order to account for
> heavy load or GC pauses is the heartbeat interval and pause
> via akka.watch.heartbeat.interval and akka.watch.heartbeat.pause. This
> will most likely mitigate the problem of death watch failures.
>
> Will do. I think GC pauses are ephemeral given the nature of  couple of
> our pipelines.
>
> Thank you for looking into the other 2.  The ROCKSDB options stuff is
> interesting and we have known that, but would dig deeper to make this per
> pipeline. We do have sense of how much state is better kept in the memtable
> to prevent a SSTable enquiry.
>
> On Mon, Jan 29, 2018 at 12:13 PM, Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Vishal,
>>
>> Akka usually quarantines remote ActorSystems in case of a system message
>> delivery failure or if the death watch was triggered. This can, for
>> example, happen if your machine is under heavy load or has a high GC
>> pressure and does not find enough time to respond to the heartbeats.
>>
>> - If you enable taskmanager.exit-on-fatal-akka-error, then it will stop
>> TMs which got quarantined. This will automatically restart TMs in case that
>> you are running Flink on Yarn. Thus, I would recommend enabling this if
>> possible
>> - The akka.ask.timeout parameter controls the timeout for remote
>> messages. You should only increase this if you observe timeouts between the
>> different Flink components. What you can change in order to account for
>> heavy load or GC pauses is the heartbeat interval and pause
>> via akka.watch.heartbeat.interval and akka.watch.heartbeat.pause. This
>> will most likely mitigate the problem of death watch failures.
>> - There is an effort to add resource specifications to Flink operators.
>> It is not yet fully implemented but you can take a look at ResourceSpec to
>> see what you can define for each operator. Once fully implemented, Flink
>> will then make sure that each operator gets a slot with enough resources.
>> - For RocksDB's resource consumption there aren't any Flink metrics yet.
>> If you want to learn more about it's resource consumption please take a
>> look here [1]. You can, though, configure the ColumnFamilyOptions by
>> implementing an OptionsFactory. That way you can configure the memtable
>> size which is allocated for each Flink state.
>>
>> [1] https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB
>> [2] https://github.com/facebook/rocksdb/wiki/Set-Up-Options
>>
>> Cheers,
>> Till
>>
>> On Mon, Jan 29, 2018 at 4:05 PM, Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> Thank you.
>>>
>>> On Mon, Jan 29, 2018 at 3:17 AM, Fabian Hueske <fh...@gmail.com>
>>> wrote:
>>>
>>>> Hi Vishal,
>>>>
>>>> sorry for the late response.
>>>> Till (in CC) might be able to answer your Akka / coordination related
>>>> questions.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2018-01-24 1:22 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>>>>
>>>>> Any suggestions ?  I know these are very general issue but these are
>>>>> edge conditions that we want the community to give us general advise on ..
>>>>>
>>>>> On Sun, Jan 21, 2018 at 3:16 PM, Vishal Santoshi <
>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>
>>>>>> There have been a couple of instances where one of our TMs was
>>>>>> quarantined ( the cause is irrelevant to this discussion ).  And we had to
>>>>>> bounce the TM to bring back sanity to the cluster.  There have been
>>>>>> discussions around and am trying to distill them. My questions are
>>>>>>
>>>>>>
>>>>>> *  Based on https://issues.apache.org/jira/browse/FLINK-3347 is it
>>>>>> advisable to set the taskmanager.exit-on-fatal-akka-error  to true.
>>>>>> ?
>>>>>>
>>>>>> * Is the akka.ask.timeout relevant here ? We could increase the value
>>>>>> to greater than 10s but based on your experiences is it more of a  "mask
>>>>>> the issue" exercise or is 10s generally a low value that *should* be
>>>>>> increased ?
>>>>>>
>>>>>> * Is it possible or is there some effort being put into per job
>>>>>> memory/resource consumption for a multi job setup that is very normal with
>>>>>> flink ?
>>>>>>
>>>>>> * Is there an effort to monitor ROCKSDB useage ( off heap and what
>>>>>> not ) ? It seems a black box to a user as of today.
>>>>>>
>>>>>> Thank you and regards.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: AKA and quarantine

Posted by Vishal Santoshi <vi...@gmail.com>.
>> If you enable taskmanager.exit-on-fatal-akka-error, then it will stop
TMs which got quarantined. This will automatically restart TMs in case that
you are running Flink on Yarn. Thus, I would recommend enabling this if
possible

We do not use yarn. This would end up restarting the jobs on the remaining
TMs ( if retry is configured ) may be OK if we have enough resources ?

>>The akka.ask.timeout parameter controls the timeout for remote messages.
You should only increase this if you observe timeouts between the different
Flink components. What you can change in order to account for heavy load or
GC pauses is the heartbeat interval and pause via akka.watch.heartbeat.interval
and akka.watch.heartbeat.pause. This will most likely mitigate the problem
of death watch failures.

Will do. I think GC pauses are ephemeral given the nature of  couple of our
pipelines.

Thank you for looking into the other 2.  The ROCKSDB options stuff is
interesting and we have known that, but would dig deeper to make this per
pipeline. We do have sense of how much state is better kept in the memtable
to prevent a SSTable enquiry.

On Mon, Jan 29, 2018 at 12:13 PM, Till Rohrmann <tr...@apache.org>
wrote:

> Hi Vishal,
>
> Akka usually quarantines remote ActorSystems in case of a system message
> delivery failure or if the death watch was triggered. This can, for
> example, happen if your machine is under heavy load or has a high GC
> pressure and does not find enough time to respond to the heartbeats.
>
> - If you enable taskmanager.exit-on-fatal-akka-error, then it will stop
> TMs which got quarantined. This will automatically restart TMs in case that
> you are running Flink on Yarn. Thus, I would recommend enabling this if
> possible
> - The akka.ask.timeout parameter controls the timeout for remote messages.
> You should only increase this if you observe timeouts between the different
> Flink components. What you can change in order to account for heavy load or
> GC pauses is the heartbeat interval and pause via akka.watch.heartbeat.interval
> and akka.watch.heartbeat.pause. This will most likely mitigate the problem
> of death watch failures.
> - There is an effort to add resource specifications to Flink operators. It
> is not yet fully implemented but you can take a look at ResourceSpec to see
> what you can define for each operator. Once fully implemented, Flink will
> then make sure that each operator gets a slot with enough resources.
> - For RocksDB's resource consumption there aren't any Flink metrics yet.
> If you want to learn more about it's resource consumption please take a
> look here [1]. You can, though, configure the ColumnFamilyOptions by
> implementing an OptionsFactory. That way you can configure the memtable
> size which is allocated for each Flink state.
>
> [1] https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB
> [2] https://github.com/facebook/rocksdb/wiki/Set-Up-Options
>
> Cheers,
> Till
>
> On Mon, Jan 29, 2018 at 4:05 PM, Vishal Santoshi <
> vishal.santoshi@gmail.com> wrote:
>
>> Thank you.
>>
>> On Mon, Jan 29, 2018 at 3:17 AM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi Vishal,
>>>
>>> sorry for the late response.
>>> Till (in CC) might be able to answer your Akka / coordination related
>>> questions.
>>>
>>> Best, Fabian
>>>
>>> 2018-01-24 1:22 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>>>
>>>> Any suggestions ?  I know these are very general issue but these are
>>>> edge conditions that we want the community to give us general advise on ..
>>>>
>>>> On Sun, Jan 21, 2018 at 3:16 PM, Vishal Santoshi <
>>>> vishal.santoshi@gmail.com> wrote:
>>>>
>>>>> There have been a couple of instances where one of our TMs was
>>>>> quarantined ( the cause is irrelevant to this discussion ).  And we had to
>>>>> bounce the TM to bring back sanity to the cluster.  There have been
>>>>> discussions around and am trying to distill them. My questions are
>>>>>
>>>>>
>>>>> *  Based on https://issues.apache.org/jira/browse/FLINK-3347 is it
>>>>> advisable to set the taskmanager.exit-on-fatal-akka-error  to true.
>>>>> ?
>>>>>
>>>>> * Is the akka.ask.timeout relevant here ? We could increase the value
>>>>> to greater than 10s but based on your experiences is it more of a  "mask
>>>>> the issue" exercise or is 10s generally a low value that *should* be
>>>>> increased ?
>>>>>
>>>>> * Is it possible or is there some effort being put into per job
>>>>> memory/resource consumption for a multi job setup that is very normal with
>>>>> flink ?
>>>>>
>>>>> * Is there an effort to monitor ROCKSDB useage ( off heap and what not
>>>>> ) ? It seems a black box to a user as of today.
>>>>>
>>>>> Thank you and regards.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: AKA and quarantine

Posted by Till Rohrmann <tr...@apache.org>.
Hi Vishal,

Akka usually quarantines remote ActorSystems in case of a system message
delivery failure or if the death watch was triggered. This can, for
example, happen if your machine is under heavy load or has a high GC
pressure and does not find enough time to respond to the heartbeats.

- If you enable taskmanager.exit-on-fatal-akka-error, then it will stop TMs
which got quarantined. This will automatically restart TMs in case that you
are running Flink on Yarn. Thus, I would recommend enabling this if possible
- The akka.ask.timeout parameter controls the timeout for remote messages.
You should only increase this if you observe timeouts between the different
Flink components. What you can change in order to account for heavy load or
GC pauses is the heartbeat interval and pause
via akka.watch.heartbeat.interval and akka.watch.heartbeat.pause. This will
most likely mitigate the problem of death watch failures.
- There is an effort to add resource specifications to Flink operators. It
is not yet fully implemented but you can take a look at ResourceSpec to see
what you can define for each operator. Once fully implemented, Flink will
then make sure that each operator gets a slot with enough resources.
- For RocksDB's resource consumption there aren't any Flink metrics yet. If
you want to learn more about it's resource consumption please take a look
here [1]. You can, though, configure the ColumnFamilyOptions by
implementing an OptionsFactory. That way you can configure the memtable
size which is allocated for each Flink state.

[1] https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB
[2] https://github.com/facebook/rocksdb/wiki/Set-Up-Options

Cheers,
Till

On Mon, Jan 29, 2018 at 4:05 PM, Vishal Santoshi <vi...@gmail.com>
wrote:

> Thank you.
>
> On Mon, Jan 29, 2018 at 3:17 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Vishal,
>>
>> sorry for the late response.
>> Till (in CC) might be able to answer your Akka / coordination related
>> questions.
>>
>> Best, Fabian
>>
>> 2018-01-24 1:22 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>>
>>> Any suggestions ?  I know these are very general issue but these are
>>> edge conditions that we want the community to give us general advise on ..
>>>
>>> On Sun, Jan 21, 2018 at 3:16 PM, Vishal Santoshi <
>>> vishal.santoshi@gmail.com> wrote:
>>>
>>>> There have been a couple of instances where one of our TMs was
>>>> quarantined ( the cause is irrelevant to this discussion ).  And we had to
>>>> bounce the TM to bring back sanity to the cluster.  There have been
>>>> discussions around and am trying to distill them. My questions are
>>>>
>>>>
>>>> *  Based on https://issues.apache.org/jira/browse/FLINK-3347 is it
>>>> advisable to set the taskmanager.exit-on-fatal-akka-error  to true. ?
>>>>
>>>> * Is the akka.ask.timeout relevant here ? We could increase the value
>>>> to greater than 10s but based on your experiences is it more of a  "mask
>>>> the issue" exercise or is 10s generally a low value that *should* be
>>>> increased ?
>>>>
>>>> * Is it possible or is there some effort being put into per job
>>>> memory/resource consumption for a multi job setup that is very normal with
>>>> flink ?
>>>>
>>>> * Is there an effort to monitor ROCKSDB useage ( off heap and what not
>>>> ) ? It seems a black box to a user as of today.
>>>>
>>>> Thank you and regards.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Re: AKA and quarantine

Posted by Vishal Santoshi <vi...@gmail.com>.
Thank you.

On Mon, Jan 29, 2018 at 3:17 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Vishal,
>
> sorry for the late response.
> Till (in CC) might be able to answer your Akka / coordination related
> questions.
>
> Best, Fabian
>
> 2018-01-24 1:22 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>
>> Any suggestions ?  I know these are very general issue but these are edge
>> conditions that we want the community to give us general advise on ..
>>
>> On Sun, Jan 21, 2018 at 3:16 PM, Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> There have been a couple of instances where one of our TMs was
>>> quarantined ( the cause is irrelevant to this discussion ).  And we had to
>>> bounce the TM to bring back sanity to the cluster.  There have been
>>> discussions around and am trying to distill them. My questions are
>>>
>>>
>>> *  Based on https://issues.apache.org/jira/browse/FLINK-3347 is it
>>> advisable to set the taskmanager.exit-on-fatal-akka-error  to true. ?
>>>
>>> * Is the akka.ask.timeout relevant here ? We could increase the value to
>>> greater than 10s but based on your experiences is it more of a  "mask the
>>> issue" exercise or is 10s generally a low value that *should* be
>>> increased ?
>>>
>>> * Is it possible or is there some effort being put into per job
>>> memory/resource consumption for a multi job setup that is very normal with
>>> flink ?
>>>
>>> * Is there an effort to monitor ROCKSDB useage ( off heap and what not )
>>> ? It seems a black box to a user as of today.
>>>
>>> Thank you and regards.
>>>
>>>
>>>
>>>
>>>
>>
>

Re: AKA and quarantine

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Vishal,

sorry for the late response.
Till (in CC) might be able to answer your Akka / coordination related
questions.

Best, Fabian

2018-01-24 1:22 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:

> Any suggestions ?  I know these are very general issue but these are edge
> conditions that we want the community to give us general advise on ..
>
> On Sun, Jan 21, 2018 at 3:16 PM, Vishal Santoshi <
> vishal.santoshi@gmail.com> wrote:
>
>> There have been a couple of instances where one of our TMs was
>> quarantined ( the cause is irrelevant to this discussion ).  And we had to
>> bounce the TM to bring back sanity to the cluster.  There have been
>> discussions around and am trying to distill them. My questions are
>>
>>
>> *  Based on https://issues.apache.org/jira/browse/FLINK-3347 is it
>> advisable to set the taskmanager.exit-on-fatal-akka-error  to true. ?
>>
>> * Is the akka.ask.timeout relevant here ? We could increase the value to
>> greater than 10s but based on your experiences is it more of a  "mask the
>> issue" exercise or is 10s generally a low value that *should* be
>> increased ?
>>
>> * Is it possible or is there some effort being put into per job
>> memory/resource consumption for a multi job setup that is very normal with
>> flink ?
>>
>> * Is there an effort to monitor ROCKSDB useage ( off heap and what not )
>> ? It seems a black box to a user as of today.
>>
>> Thank you and regards.
>>
>>
>>
>>
>>
>

Re: AKA and quarantine

Posted by Vishal Santoshi <vi...@gmail.com>.
Any suggestions ?  I know these are very general issue but these are edge
conditions that we want the community to give us general advise on ..

On Sun, Jan 21, 2018 at 3:16 PM, Vishal Santoshi <vi...@gmail.com>
wrote:

> There have been a couple of instances where one of our TMs was quarantined
> ( the cause is irrelevant to this discussion ).  And we had to bounce the
> TM to bring back sanity to the cluster.  There have been discussions around
> and am trying to distill them. My questions are
>
>
> *  Based on https://issues.apache.org/jira/browse/FLINK-3347 is it
> advisable to set the taskmanager.exit-on-fatal-akka-error  to true. ?
>
> * Is the akka.ask.timeout relevant here ? We could increase the value to
> greater than 10s but based on your experiences is it more of a  "mask the
> issue" exercise or is 10s generally a low value that *should* be
> increased ?
>
> * Is it possible or is there some effort being put into per job
> memory/resource consumption for a multi job setup that is very normal with
> flink ?
>
> * Is there an effort to monitor ROCKSDB useage ( off heap and what not ) ?
> It seems a black box to a user as of today.
>
> Thank you and regards.
>
>
>
>
>