You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by 祁明良 <mq...@xiaohongshu.com> on 2018/08/12 16:36:03 UTC

Tuning checkpoint

Hi all,


I have several questions regarding the checkpoint. The background is I'm using a ProcessFunction keyed by user_id somehow works like following:

inputStream
  .keyBy(x => getUserKey(x))
  .process(...)

It runs on yarn with 40 TMs * 2 slots each, when I look at the checkpoint metrics, only a small number of subtasks have a large "alignment buffered/duration", and looks like either all the 2 slots on the same TM are both high or both low.  What may probably cause this?

  1.  maybe data skew, but I see the amount of data is almost same
  2.  or network?
  3.  The system is under back pressure, but I don't understand why only like 4 out of 80 subtasks perform like this.

Another question is about the alignment buffer, I thought it was only used for multiple input stream cases. But for keyed process function , what is actually aligned?

The last question is about tuning rocksdb, I try to assign some memory to writebuffer and block cache, and the doc says "typically by decreasing the JVM heap size of the TaskManagers by the same amount" , and taskmanager heap size is "On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value." This looks like I should decrease the taskmanager heap and the value is set by YARN automatically, so what should I do?

Best,

Mingliang

本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of Red. If you have received it in error, please advise the sender by reply e-mail and immediately delete the message and any attachments without copying or disclosing the contents. Thank you.

Re: Tuning checkpoint

Posted by Hequn Cheng <ch...@gmail.com>.
Hi mingliang,

Considering your first question. I answered it on stack overflow[1].
Hope it helps.

Best, Hequn

[1]
https://stackoverflow.com/questions/51832577/what-may-probably-cause-large-alignment-duration-for-flink-job


On Tue, Aug 14, 2018 at 10:16 AM, 祁明良 <mq...@xiaohongshu.com> wrote:

> Thank you for this great answer, Fabian.
>
> Regarding the yarn JVM heap size, I tried to change
> containerized.heap*-*cutoff*-**ratio**:* *0.25*
> And it somehow looks like working, but the actually memory needed for
> rocksdb still looks like a blackbox  to me. I see there’s already a JIRA
> ticket talking about this problem[1], created last year and still open yet.
> What I can do is just keep enlarging this value until YARN don’t kill my
> TaskManager because of memory usage:)
>
> By the way, my rough calculation of rocksdb memory on each TM is like
> num of slots per task * num of stateful operators(including source and
> sink?) * (block cache size + write buffer size)
>
> I bet it’s not correct..
>
> Best,
> Mingliang
>
> [1] https://issues.apache.org/jira/browse/FLINK-7289
>
> On 13 Aug 2018, at 11:05 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
> Hi Mingliang,
>
> let me answer your second question first:
>
> > Another question is about the alignment buffer, I thought it was only
> used for multiple input stream cases. But for keyed process function , what
> is actually aligned?
>
> When a task sends records to multiple downstream tasks (task not
> operators!) due to a broadcast or partition/keyBy/shuffle connection, the
> task broadcasts each checkpoint barrier to all of its receiving tasks.
> Therefore, each task that receives records from multiple tasks will
> receive multiple checkpoint barriers. (Checkpoint barriers behave similar
> to watermarks in this regard)
> In order to provide exactly-once state consistency, a task must buffer
> records from input connection that forwarded a barrier until barriers from
> all input connections have been received and the state checkpoint was
> initiated.
>
> What does this mean for the long checkpoint alignment that you observe?
> Checkpoint alignment starts when the first barrier is received and ends
> when the last barrier is received.
> Hence, it seems as if one task manager receives some barrier(s) later than
> the other nodes, probably because it is more heavily loaded.
> The fact that all affected tasks run on the same TM and that you mentioned
> backpressure is a hint for that because TMs multiplex the connection of all
> tasks.
>
> Regarding the memory configuration question, I am not sure if there is a
> way to override the JVM heap size on YARN. Maybe others can answer this
> question.
>
> Best,
> Fabian
>
> 2018-08-12 18:36 GMT+02:00 祁明良 <mq...@xiaohongshu.com>:
>
>> Hi all,
>>
>> I have several questions regarding the checkpoint. The background is I'm
>> using a ProcessFunction keyed by user_id somehow works like following:
>>
>> inputStream
>>   .keyBy(x => getUserKey(x))
>>   .process(...)
>>
>> It runs on yarn with 40 TMs * 2 slots each, when I look at the checkpoint metrics, only a small number of subtasks have a large "alignment buffered/duration", and looks like either all the 2 slots on the same TM are both high or both low.  What may probably cause this?
>>
>>
>>    1. maybe data skew, but I see the amount of data is almost same
>>    2. or network?
>>    3. The system is under back pressure, but I don't understand why only like 4 out of 80 subtasks perform like this.
>>
>> Another question is about the alignment buffer, I thought it was only used for multiple input stream cases. But for keyed process function , what is actually aligned?
>>
>> The last question is about tuning rocksdb, I try to assign some memory to writebuffer and block cache, and the doc says "typically by decreasing the JVM heap size of the TaskManagers by the same amount" , and taskmanager heap size is "On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value." This looks like I should decrease the taskmanager heap and the value is set by YARN automatically, so what should I do?
>>
>> Best,
>>
>> Mingliang
>>
>>
>>
>>
>> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部
>> 分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
>> This communication may contain privileged or other confidential
>> information of Red. If you have received it in error, please advise the
>> sender by reply e-mail and immediately delete the message and any
>> attachments without copying or disclosing the contents. Thank you.
>>
>>
>
>
> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(
> 包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
> This communication may contain privileged or other confidential
> information of Red. If you have received it in error, please advise the
> sender by reply e-mail and immediately delete the message and any
> attachments without copying or disclosing the contents. Thank you.
>
>

Re: Tuning checkpoint

Posted by 祁明良 <mq...@xiaohongshu.com>.
Thank you for this great answer, Fabian.

Regarding the yarn JVM heap size, I tried to change
containerized.heap-cutoff-ratio: 0.25
And it somehow looks like working, but the actually memory needed for rocksdb still looks like a blackbox  to me. I see there’s already a JIRA ticket talking about this problem[1], created last year and still open yet. What I can do is just keep enlarging this value until YARN don’t kill my TaskManager because of memory usage:)

By the way, my rough calculation of rocksdb memory on each TM is like
num of slots per task * num of stateful operators(including source and sink?) * (block cache size + write buffer size)

I bet it’s not correct..

Best,
Mingliang

[1] https://issues.apache.org/jira/browse/FLINK-7289

On 13 Aug 2018, at 11:05 PM, Fabian Hueske <fh...@gmail.com>> wrote:

Hi Mingliang,

let me answer your second question first:

> Another question is about the alignment buffer, I thought it was only used for multiple input stream cases. But for keyed process function , what is actually aligned?

When a task sends records to multiple downstream tasks (task not operators!) due to a broadcast or partition/keyBy/shuffle connection, the task broadcasts each checkpoint barrier to all of its receiving tasks.
Therefore, each task that receives records from multiple tasks will receive multiple checkpoint barriers. (Checkpoint barriers behave similar to watermarks in this regard)
In order to provide exactly-once state consistency, a task must buffer records from input connection that forwarded a barrier until barriers from all input connections have been received and the state checkpoint was initiated.

What does this mean for the long checkpoint alignment that you observe?
Checkpoint alignment starts when the first barrier is received and ends when the last barrier is received.
Hence, it seems as if one task manager receives some barrier(s) later than the other nodes, probably because it is more heavily loaded.
The fact that all affected tasks run on the same TM and that you mentioned backpressure is a hint for that because TMs multiplex the connection of all tasks.

Regarding the memory configuration question, I am not sure if there is a way to override the JVM heap size on YARN. Maybe others can answer this question.

Best,
Fabian

2018-08-12 18:36 GMT+02:00 祁明良 <mq...@xiaohongshu.com>>:

Hi all,

I have several questions regarding the checkpoint. The background is I'm using a ProcessFunction keyed by user_id somehow works like following:

inputStream
  .keyBy(x => getUserKey(x))
  .process(...)

It runs on yarn with 40 TMs * 2 slots each, when I look at the checkpoint metrics, only a small number of subtasks have a large "alignment buffered/duration", and looks like either all the 2 slots on the same TM are both high or both low.  What may probably cause this?

  1.  maybe data skew, but I see the amount of data is almost same
  2.  or network?
  3.  The system is under back pressure, but I don't understand why only like 4 out of 80 subtasks perform like this.

Another question is about the alignment buffer, I thought it was only used for multiple input stream cases. But for keyed process function , what is actually aligned?

The last question is about tuning rocksdb, I try to assign some memory to writebuffer and block cache, and the doc says "typically by decreasing the JVM heap size of the TaskManagers by the same amount" , and taskmanager heap size is "On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value." This looks like I should decrease the taskmanager heap and the value is set by YARN automatically, so what should I do?

Best,

Mingliang



本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of Red. If you have received it in error, please advise the sender by reply e-mail and immediately delete the message and any attachments without copying or disclosing the contents. Thank you.




本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of Red. If you have received it in error, please advise the sender by reply e-mail and immediately delete the message and any attachments without copying or disclosing the contents. Thank you.

Re: Tuning checkpoint

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Mingliang,

let me answer your second question first:

> Another question is about the alignment buffer, I thought it was only
used for multiple input stream cases. But for keyed process function , what
is actually aligned?

When a task sends records to multiple downstream tasks (task not
operators!) due to a broadcast or partition/keyBy/shuffle connection, the
task broadcasts each checkpoint barrier to all of its receiving tasks.
Therefore, each task that receives records from multiple tasks will receive
multiple checkpoint barriers. (Checkpoint barriers behave similar to
watermarks in this regard)
In order to provide exactly-once state consistency, a task must buffer
records from input connection that forwarded a barrier until barriers from
all input connections have been received and the state checkpoint was
initiated.

What does this mean for the long checkpoint alignment that you observe?
Checkpoint alignment starts when the first barrier is received and ends
when the last barrier is received.
Hence, it seems as if one task manager receives some barrier(s) later than
the other nodes, probably because it is more heavily loaded.
The fact that all affected tasks run on the same TM and that you mentioned
backpressure is a hint for that because TMs multiplex the connection of all
tasks.

Regarding the memory configuration question, I am not sure if there is a
way to override the JVM heap size on YARN. Maybe others can answer this
question.

Best,
Fabian

2018-08-12 18:36 GMT+02:00 祁明良 <mq...@xiaohongshu.com>:

> Hi all,
>
>
> I have several questions regarding the checkpoint. The background is I'm
> using a ProcessFunction keyed by user_id somehow works like following:
>
> inputStream
>   .keyBy(x => getUserKey(x))
>   .process(...)
>
> It runs on yarn with 40 TMs * 2 slots each, when I look at the checkpoint metrics, only a small number of subtasks have a large "alignment buffered/duration", and looks like either all the 2 slots on the same TM are both high or both low.  What may probably cause this?
>
>
>    1. maybe data skew, but I see the amount of data is almost same
>    2. or network?
>    3. The system is under back pressure, but I don't understand why only like 4 out of 80 subtasks perform like this.
>
> Another question is about the alignment buffer, I thought it was only used for multiple input stream cases. But for keyed process function , what is actually aligned?
>
> The last question is about tuning rocksdb, I try to assign some memory to writebuffer and block cache, and the doc says "typically by decreasing the JVM heap size of the TaskManagers by the same amount" , and taskmanager heap size is "On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value." This looks like I should decrease the taskmanager heap and the value is set by YARN automatically, so what should I do?
>
> Best,
>
> Mingliang
>
>
> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(
> 包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
> This communication may contain privileged or other confidential
> information of Red. If you have received it in error, please advise the
> sender by reply e-mail and immediately delete the message and any
> attachments without copying or disclosing the contents. Thank you.
>
>