You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Felipe Gutierrez <fe...@gmail.com> on 2019/11/05 08:57:55 UTC

How can I get the backpressure signals inside my function or operator?

Hi all,

let's say that I have a "source -> map .> preAggregrate -> keyBy -> reduce
-> sink" job and the reducer is sending backpressure signals to the
preAggregate, map and source operator. How do I get those signals inside my
operator's implementation?
I guess inside the function is not possible. But if I have my own operator
implemented (preAggregate) can I get those backpressure signals?

I want to get the messages "Shuffle.Netty.Input.Buffers.inputQueueLength"
[1] on my preAggregate operator in order to decide when I stop the
pre-aggregation and flush tuples or when I keep pre aggregating. It is
something like the "credit based control on the network stack" [2].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#default-shuffle-service
[2] https://www.youtube.com/watch?v=AbqatHF3tZI

Thanks!
Felipe
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*

Re: How can I get the backpressure signals inside my function or operator?

Posted by Felipe Gutierrez <fe...@gmail.com>.
humm, that is also another possibility. Thanks for your suggestion!

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Thu, Nov 7, 2019 at 10:41 PM Yuval Itzchakov <yu...@gmail.com> wrote:

> Hi,
>
> We've been dealing with a similar problem of downstream consumers causing
> backpressure. One idea that a colleague of mine suggested is measuring the
> time it takes to call Collector[T].out. Since this method is used to push
> the records downstream, it will also actively block in case the buffer is
> full and there are no more floating buffers to allocate, hence causing the
> backpressure.
>
> Thus, if you know the average time it takes this function to be invoked
> when there's no backpressure, you can make an educated guess on the time it
> takes when there is pressure (you'll need to measure these times in your
> source/operator), and actively slow down the number of records being pushed
> downstream.
>
> Yuval.
>
> On Thu, 7 Nov 2019, 9:17 Felipe Gutierrez, <fe...@gmail.com>
> wrote:
>
>> cool! I got to use it.
>> Now I have to get the jobID and vertice ID inside the operator.
>>
>> I forgot to mention. I am using Flink 1.9.1
>>
>> Thanks!
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>>
>> On Thu, Nov 7, 2019 at 4:59 AM Zhijiang <wa...@aliyun.com>
>> wrote:
>>
>>> You can refer to this document [1] for the rest API details.
>>> Actually the backpreesure uri refers to "
>>> /jobs/:jobid/vertices/:vertexid/backpressure". But I am not sure
>>> whether it is easy to get the jobid and vertexid.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html
>>>
>>> Best,
>>> Zhijiang
>>>
>>> ------------------------------------------------------------------
>>> From:Felipe Gutierrez <fe...@gmail.com>
>>> Send Time:2019 Nov. 7 (Thu.) 00:06
>>> To:Chesnay Schepler <ch...@apache.org>
>>> Cc:Zhijiang <wa...@aliyun.com>; user <us...@flink.apache.org>
>>> Subject:Re: How can I get the backpressure signals inside my function or
>>> operator?
>>>
>>> If I can trigger the sample via rest API it is good for a POC. Then I
>>> can read from any in-memory storage using a separated thread within the
>>> operator. But what is the rest api that gives to me the ratio value from
>>> backpressure?
>>>
>>> Thanks
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>>
>>> On Wed, Nov 6, 2019 at 4:55 PM Chesnay Schepler <ch...@apache.org>
>>> wrote:
>>>
>>> I don't think there is a truly sane way to do this.
>>>
>>> I could envision a separate application triggering samples via the REST
>>> API, writing the results into kafka which your operator can read. This is
>>> probably the most reasonable solution I can come up with.
>>>
>>> Any attempt at accessing the TaskExecutor or metrics from within the
>>> operator are inadvisable; you'd be encroaching into truly hacky territory.
>>>
>>> You could also do your own backpressure sampling within your operator
>>> (separate thread within the operator executing the same sampling logic),
>>> but I don't know how easy it would be to re-use Flink code.
>>>
>>> On 06/11/2019 13:40, Felipe Gutierrez wrote:
>>> Does anyone know in which metric I can rely on to know if a given
>>> operator is activating the backpressure?
>>> Or how can I call the same java object that the Flink UI calls to give
>>> me the ratio of backpressure?
>>>
>>> Thanks,
>>> Felipe
>>>
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez *
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>>
>>> On Tue, Nov 5, 2019 at 4:15 PM Felipe Gutierrez <
>>> felipe.o.gutierrez@gmail.com> wrote:
>>> Hi Zhijiang,
>>>
>>> thanks for your reply. Yes, you understood correctly.
>>> The fact that I cannot get "Shuffle.Netty.Input.Buffers.inputQueueLength"
>>> on the operator might be because of the way Flink runtime architecture was
>>> designed. But I was wondering what kind of signal I can get. I guess some
>>> backpressure message I could get because backpressure works to slow down
>>> the upstream operators.
>>>
>>> For example, I can see the ratio per sub-task on the web interface [1].
>>> It means the physical operators. Is there any message flowing backward that
>>> I can get? Is there anything that makes me able to not rely on some
>>> external storage?
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html#sampling-threads
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez *
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>>
>>> On Tue, Nov 5, 2019 at 12:23 PM Zhijiang <wa...@aliyun.com>
>>> wrote:
>>> Hi Felipe,
>>>
>>> That is an interesting idea to control the upstream's output based on
>>> downstream's input.
>>>
>>> If I understood correctly, the preAggregate operator would trigger
>>> flush output while the reduce operator is idle/hungry. In contrast, the preAggregate
>>> would continue aggregating data in the case of back pressure.
>>>
>>> I think this requirement is valid, but unfortunately I guess you can not
>>> get the back pressure signal from the operator level. AIK only the upper
>>> task level can get the input/output state to decide whether to process or
>>> not.
>>>
>>> If you want to get the reduce's metric of `Shuffle.Netty.Input.Buffers.inputQueueLength`
>>> on preAggregate side, you might rely on some external metric reporter
>>> to query it if possible.
>>>
>>> Best,
>>> Zhijiang
>>>
>>> ------------------------------------------------------------------
>>> From:Felipe Gutierrez <fe...@gmail.com>
>>> Send Time:2019 Nov. 5 (Tue.) 16:58
>>> To:user <us...@flink.apache.org>
>>> Subject:How can I get the backpressure signals inside my function or
>>> operator?
>>>
>>> Hi all,
>>>
>>> let's say that I have a "source -> map .> preAggregrate -> keyBy ->
>>> reduce -> sink" job and the reducer is sending backpressure signals to the
>>> preAggregate, map and source operator. How do I get those signals inside my
>>> operator's implementation?
>>> I guess inside the function is not possible. But if I have my own
>>> operator implemented (preAggregate) can I get those backpressure signals?
>>>
>>> I want to get the messages
>>> "Shuffle.Netty.Input.Buffers.inputQueueLength" [1] on my preAggregate
>>> operator in order to decide when I stop the pre-aggregation and flush
>>> tuples or when I keep pre aggregating. It is something like the "credit
>>> based control on the network stack" [2].
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#default-shuffle-service
>>> [2] https://www.youtube.com/watch?v=AbqatHF3tZI
>>>
>>> Thanks!
>>> Felipe
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez *
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>>
>>>
>>>

Re: How can I get the backpressure signals inside my function or operator?

Posted by Yuval Itzchakov <yu...@gmail.com>.
Hi,

We've been dealing with a similar problem of downstream consumers causing
backpressure. One idea that a colleague of mine suggested is measuring the
time it takes to call Collector[T].out. Since this method is used to push
the records downstream, it will also actively block in case the buffer is
full and there are no more floating buffers to allocate, hence causing the
backpressure.

Thus, if you know the average time it takes this function to be invoked
when there's no backpressure, you can make an educated guess on the time it
takes when there is pressure (you'll need to measure these times in your
source/operator), and actively slow down the number of records being pushed
downstream.

Yuval.

On Thu, 7 Nov 2019, 9:17 Felipe Gutierrez, <fe...@gmail.com>
wrote:

> cool! I got to use it.
> Now I have to get the jobID and vertice ID inside the operator.
>
> I forgot to mention. I am using Flink 1.9.1
>
> Thanks!
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Thu, Nov 7, 2019 at 4:59 AM Zhijiang <wa...@aliyun.com>
> wrote:
>
>> You can refer to this document [1] for the rest API details.
>> Actually the backpreesure uri refers to "
>> /jobs/:jobid/vertices/:vertexid/backpressure". But I am not sure whether
>> it is easy to get the jobid and vertexid.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html
>>
>> Best,
>> Zhijiang
>>
>> ------------------------------------------------------------------
>> From:Felipe Gutierrez <fe...@gmail.com>
>> Send Time:2019 Nov. 7 (Thu.) 00:06
>> To:Chesnay Schepler <ch...@apache.org>
>> Cc:Zhijiang <wa...@aliyun.com>; user <us...@flink.apache.org>
>> Subject:Re: How can I get the backpressure signals inside my function or
>> operator?
>>
>> If I can trigger the sample via rest API it is good for a POC. Then I can
>> read from any in-memory storage using a separated thread within the
>> operator. But what is the rest api that gives to me the ratio value from
>> backpressure?
>>
>> Thanks
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>>
>> On Wed, Nov 6, 2019 at 4:55 PM Chesnay Schepler <ch...@apache.org>
>> wrote:
>>
>> I don't think there is a truly sane way to do this.
>>
>> I could envision a separate application triggering samples via the REST
>> API, writing the results into kafka which your operator can read. This is
>> probably the most reasonable solution I can come up with.
>>
>> Any attempt at accessing the TaskExecutor or metrics from within the
>> operator are inadvisable; you'd be encroaching into truly hacky territory.
>>
>> You could also do your own backpressure sampling within your operator
>> (separate thread within the operator executing the same sampling logic),
>> but I don't know how easy it would be to re-use Flink code.
>>
>> On 06/11/2019 13:40, Felipe Gutierrez wrote:
>> Does anyone know in which metric I can rely on to know if a given
>> operator is activating the backpressure?
>> Or how can I call the same java object that the Flink UI calls to give me
>> the ratio of backpressure?
>>
>> Thanks,
>> Felipe
>>
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez *
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>>
>> On Tue, Nov 5, 2019 at 4:15 PM Felipe Gutierrez <
>> felipe.o.gutierrez@gmail.com> wrote:
>> Hi Zhijiang,
>>
>> thanks for your reply. Yes, you understood correctly.
>> The fact that I cannot get "Shuffle.Netty.Input.Buffers.inputQueueLength"
>> on the operator might be because of the way Flink runtime architecture was
>> designed. But I was wondering what kind of signal I can get. I guess some
>> backpressure message I could get because backpressure works to slow down
>> the upstream operators.
>>
>> For example, I can see the ratio per sub-task on the web interface [1].
>> It means the physical operators. Is there any message flowing backward that
>> I can get? Is there anything that makes me able to not rely on some
>> external storage?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html#sampling-threads
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez *
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>>
>> On Tue, Nov 5, 2019 at 12:23 PM Zhijiang <wa...@aliyun.com>
>> wrote:
>> Hi Felipe,
>>
>> That is an interesting idea to control the upstream's output based on
>> downstream's input.
>>
>> If I understood correctly, the preAggregate operator would trigger flush
>> output while the reduce operator is idle/hungry. In contrast, the preAggregate
>> would continue aggregating data in the case of back pressure.
>>
>> I think this requirement is valid, but unfortunately I guess you can not
>> get the back pressure signal from the operator level. AIK only the upper
>> task level can get the input/output state to decide whether to process or
>> not.
>>
>> If you want to get the reduce's metric of `Shuffle.Netty.Input.Buffers.inputQueueLength`
>> on preAggregate side, you might rely on some external metric reporter to
>> query it if possible.
>>
>> Best,
>> Zhijiang
>>
>> ------------------------------------------------------------------
>> From:Felipe Gutierrez <fe...@gmail.com>
>> Send Time:2019 Nov. 5 (Tue.) 16:58
>> To:user <us...@flink.apache.org>
>> Subject:How can I get the backpressure signals inside my function or
>> operator?
>>
>> Hi all,
>>
>> let's say that I have a "source -> map .> preAggregrate -> keyBy ->
>> reduce -> sink" job and the reducer is sending backpressure signals to the
>> preAggregate, map and source operator. How do I get those signals inside my
>> operator's implementation?
>> I guess inside the function is not possible. But if I have my own
>> operator implemented (preAggregate) can I get those backpressure signals?
>>
>> I want to get the messages "Shuffle.Netty.Input.Buffers.inputQueueLength"
>> [1] on my preAggregate operator in order to decide when I stop the
>> pre-aggregation and flush tuples or when I keep pre aggregating. It is
>> something like the "credit based control on the network stack" [2].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#default-shuffle-service
>> [2] https://www.youtube.com/watch?v=AbqatHF3tZI
>>
>> Thanks!
>> Felipe
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez *
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>>
>>
>>

Re: How can I get the backpressure signals inside my function or operator?

Posted by Felipe Gutierrez <fe...@gmail.com>.
cool! I got to use it.
Now I have to get the jobID and vertice ID inside the operator.

I forgot to mention. I am using Flink 1.9.1

Thanks!
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Thu, Nov 7, 2019 at 4:59 AM Zhijiang <wa...@aliyun.com> wrote:

> You can refer to this document [1] for the rest API details.
> Actually the backpreesure uri refers to "
> /jobs/:jobid/vertices/:vertexid/backpressure". But I am not sure whether
> it is easy to get the jobid and vertexid.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html
>
> Best,
> Zhijiang
>
> ------------------------------------------------------------------
> From:Felipe Gutierrez <fe...@gmail.com>
> Send Time:2019 Nov. 7 (Thu.) 00:06
> To:Chesnay Schepler <ch...@apache.org>
> Cc:Zhijiang <wa...@aliyun.com>; user <us...@flink.apache.org>
> Subject:Re: How can I get the backpressure signals inside my function or
> operator?
>
> If I can trigger the sample via rest API it is good for a POC. Then I can
> read from any in-memory storage using a separated thread within the
> operator. But what is the rest api that gives to me the ratio value from
> backpressure?
>
> Thanks
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Wed, Nov 6, 2019 at 4:55 PM Chesnay Schepler <ch...@apache.org>
> wrote:
>
> I don't think there is a truly sane way to do this.
>
> I could envision a separate application triggering samples via the REST
> API, writing the results into kafka which your operator can read. This is
> probably the most reasonable solution I can come up with.
>
> Any attempt at accessing the TaskExecutor or metrics from within the
> operator are inadvisable; you'd be encroaching into truly hacky territory.
>
> You could also do your own backpressure sampling within your operator
> (separate thread within the operator executing the same sampling logic),
> but I don't know how easy it would be to re-use Flink code.
>
> On 06/11/2019 13:40, Felipe Gutierrez wrote:
> Does anyone know in which metric I can rely on to know if a given operator
> is activating the backpressure?
> Or how can I call the same java object that the Flink UI calls to give me
> the ratio of backpressure?
>
> Thanks,
> Felipe
>
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez *
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Tue, Nov 5, 2019 at 4:15 PM Felipe Gutierrez <
> felipe.o.gutierrez@gmail.com> wrote:
> Hi Zhijiang,
>
> thanks for your reply. Yes, you understood correctly.
> The fact that I cannot get "Shuffle.Netty.Input.Buffers.inputQueueLength"
> on the operator might be because of the way Flink runtime architecture was
> designed. But I was wondering what kind of signal I can get. I guess some
> backpressure message I could get because backpressure works to slow down
> the upstream operators.
>
> For example, I can see the ratio per sub-task on the web interface [1]. It
> means the physical operators. Is there any message flowing backward that I
> can get? Is there anything that makes me able to not rely on some external
> storage?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html#sampling-threads
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez *
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Tue, Nov 5, 2019 at 12:23 PM Zhijiang <wa...@aliyun.com>
> wrote:
> Hi Felipe,
>
> That is an interesting idea to control the upstream's output based on
> downstream's input.
>
> If I understood correctly, the preAggregate operator would trigger flush
> output while the reduce operator is idle/hungry. In contrast, the preAggregate
> would continue aggregating data in the case of back pressure.
>
> I think this requirement is valid, but unfortunately I guess you can not
> get the back pressure signal from the operator level. AIK only the upper
> task level can get the input/output state to decide whether to process or
> not.
>
> If you want to get the reduce's metric of `Shuffle.Netty.Input.Buffers.inputQueueLength`
> on preAggregate side, you might rely on some external metric reporter to
> query it if possible.
>
> Best,
> Zhijiang
>
> ------------------------------------------------------------------
> From:Felipe Gutierrez <fe...@gmail.com>
> Send Time:2019 Nov. 5 (Tue.) 16:58
> To:user <us...@flink.apache.org>
> Subject:How can I get the backpressure signals inside my function or
> operator?
>
> Hi all,
>
> let's say that I have a "source -> map .> preAggregrate -> keyBy -> reduce
> -> sink" job and the reducer is sending backpressure signals to the
> preAggregate, map and source operator. How do I get those signals inside my
> operator's implementation?
> I guess inside the function is not possible. But if I have my own operator
> implemented (preAggregate) can I get those backpressure signals?
>
> I want to get the messages "Shuffle.Netty.Input.Buffers.inputQueueLength"
> [1] on my preAggregate operator in order to decide when I stop the
> pre-aggregation and flush tuples or when I keep pre aggregating. It is
> something like the "credit based control on the network stack" [2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#default-shuffle-service
> [2] https://www.youtube.com/watch?v=AbqatHF3tZI
>
> Thanks!
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez *
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
>
>

Re: How can I get the backpressure signals inside my function or operator?

Posted by Zhijiang <wa...@aliyun.com>.
You can refer to this document [1] for the rest API details.
Actually the backpreesure uri refers to "/jobs/:jobid/vertices/:vertexid/backpressure". But I am not sure whether it is easy to get the jobid and vertexid.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html

Best,
Zhijiang
------------------------------------------------------------------
From:Felipe Gutierrez <fe...@gmail.com>
Send Time:2019 Nov. 7 (Thu.) 00:06
To:Chesnay Schepler <ch...@apache.org>
Cc:Zhijiang <wa...@aliyun.com>; user <us...@flink.apache.org>
Subject:Re: How can I get the backpressure signals inside my function or operator?

If I can trigger the sample via rest API it is good for a POC. Then I can read from any in-memory storage using a separated thread within the operator. But what is the rest api that gives to me the ratio value from backpressure?

Thanks
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez

-- https://felipeogutierrez.blogspot.com

On Wed, Nov 6, 2019 at 4:55 PM Chesnay Schepler <ch...@apache.org> wrote:

I don't think there is a truly sane way to do this. 
I could envision a separate application triggering samples via the REST API, writing the results into kafka which your operator can read. This is probably the most reasonable solution I can come up with.
Any attempt at accessing the TaskExecutor or metrics from within the operator are inadvisable; you'd be encroaching into truly hacky territory.
You could also do your own backpressure sampling within your operator (separate thread within the operator executing the same sampling logic), but I don't know how easy it would be to re-use Flink code.

On 06/11/2019 13:40, Felipe Gutierrez wrote:
Does anyone know in which metric I can rely on to know if a given operator is activating the backpressure? 
Or how can I call the same java object that the Flink UI calls to give me the ratio of backpressure?

Thanks,
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com            

On Tue, Nov 5, 2019 at 4:15 PM Felipe Gutierrez <fe...@gmail.com> wrote:
Hi Zhijiang, 

thanks for your reply. Yes, you understood correctly.
The fact that I cannot get "Shuffle.Netty.Input.Buffers.inputQueueLength" on the operator might be because of the way Flink runtime architecture was designed. But I was wondering what kind of signal I can get. I guess some backpressure message I could get because backpressure works to slow down the upstream operators. 

For example, I can see the ratio per sub-task on the web interface [1]. It means the physical operators. Is there any message flowing backward that I can get? Is there anything that makes me able to not rely on some external storage?

[1] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html#sampling-threads
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com            

On Tue, Nov 5, 2019 at 12:23 PM Zhijiang <wa...@aliyun.com> wrote:
Hi Felipe,

That is an interesting idea to control the upstream's output based on downstream's input.

 If I understood correctly, the preAggregate operator would trigger flush output while the reduce operator is idle/hungry. In contrast, the preAggregate would continue aggregating data in the case of back pressure.

I think this requirement is valid, but unfortunately I guess you can not get the back pressure signal from the operator level. AIK only the upper task level can get the input/output state to decide whether to process or not.

If you want to get the reduce's metric of `Shuffle.Netty.Input.Buffers.inputQueueLength` on preAggregate side, you might rely on some external metric reporter to query it if possible.

Best,
Zhijiang 

------------------------------------------------------------------
From:Felipe Gutierrez <fe...@gmail.com>
Send Time:2019 Nov. 5 (Tue.) 16:58
To:user <us...@flink.apache.org>
Subject:How can I get the backpressure signals inside my function or operator?

Hi all,

 let's say that I have a "source -> map .> preAggregrate -> keyBy -> reduce -> sink" job and the reducer is sending backpressure signals to the preAggregate, map and source operator. How do I get those signals inside my operator's implementation?
 I guess inside the function is not possible. But if I have my own operator implemented (preAggregate) can I get those backpressure signals?

 I want to get the messages "Shuffle.Netty.Input.Buffers.inputQueueLength" [1] on my preAggregate operator in order to decide when I stop the pre-aggregation and flush tuples or when I keep pre aggregating. It is something like the "credit based control on the network stack" [2].

 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#default-shuffle-service
 [2] https://www.youtube.com/watch?v=AbqatHF3tZI

 Thanks!
 Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com  




Re: How can I get the backpressure signals inside my function or operator?

Posted by Felipe Gutierrez <fe...@gmail.com>.
If I can trigger the sample via rest API it is good for a POC. Then I can
read from any in-memory storage using a separated thread within the
operator. But what is the rest api that gives to me the ratio value from
backpressure?

Thanks
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Wed, Nov 6, 2019 at 4:55 PM Chesnay Schepler <ch...@apache.org> wrote:

> I don't think there is a truly sane way to do this.
>
> I could envision a separate application triggering samples via the REST
> API, writing the results into kafka which your operator can read. This is
> probably the most reasonable solution I can come up with.
>
> Any attempt at accessing the TaskExecutor or metrics from within the
> operator are inadvisable; you'd be encroaching into truly hacky territory.
>
> You could also do your own backpressure sampling within your operator
> (separate thread within the operator executing the same sampling logic),
> but I don't know how easy it would be to re-use Flink code.
>
> On 06/11/2019 13:40, Felipe Gutierrez wrote:
>
> Does anyone know in which metric I can rely on to know if a given operator
> is activating the backpressure?
> Or how can I call the same java object that the Flink UI calls to give me
> the ratio of backpressure?
>
> Thanks,
> Felipe
>
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez *
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Tue, Nov 5, 2019 at 4:15 PM Felipe Gutierrez <
> felipe.o.gutierrez@gmail.com> wrote:
>
>> Hi Zhijiang,
>>
>> thanks for your reply. Yes, you understood correctly.
>> The fact that I cannot get "Shuffle.Netty.Input.Buffers.inputQueueLength"
>> on the operator might be because of the way Flink runtime architecture was
>> designed. But I was wondering what kind of signal I can get. I guess some
>> backpressure message I could get because backpressure works to slow down
>> the upstream operators.
>>
>> For example, I can see the ratio per sub-task on the web interface [1].
>> It means the physical operators. Is there any message flowing backward that
>> I can get? Is there anything that makes me able to not rely on some
>> external storage?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html#sampling-threads
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez *
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>>
>> On Tue, Nov 5, 2019 at 12:23 PM Zhijiang <wa...@aliyun.com>
>> wrote:
>>
>>> Hi Felipe,
>>>
>>> That is an interesting idea to control the upstream's output based on
>>> downstream's input.
>>>
>>> If I understood correctly, the preAggregate operator would trigger
>>> flush output while the reduce operator is idle/hungry. In contrast, the preAggregate
>>> would continue aggregating data in the case of back pressure.
>>>
>>> I think this requirement is valid, but unfortunately I guess you can not
>>> get the back pressure signal from the operator level. AIK only the upper
>>> task level can get the input/output state to decide whether to process or
>>> not.
>>>
>>> If you want to get the reduce's metric of `Shuffle.Netty.Input.Buffers.inputQueueLength`
>>> on preAggregate side, you might rely on some external metric reporter
>>> to query it if possible.
>>>
>>> Best,
>>> Zhijiang
>>>
>>> ------------------------------------------------------------------
>>> From:Felipe Gutierrez <fe...@gmail.com>
>>> Send Time:2019 Nov. 5 (Tue.) 16:58
>>> To:user <us...@flink.apache.org>
>>> Subject:How can I get the backpressure signals inside my function or
>>> operator?
>>>
>>> Hi all,
>>>
>>> let's say that I have a "source -> map .> preAggregrate -> keyBy ->
>>> reduce -> sink" job and the reducer is sending backpressure signals to the
>>> preAggregate, map and source operator. How do I get those signals inside my
>>> operator's implementation?
>>> I guess inside the function is not possible. But if I have my own
>>> operator implemented (preAggregate) can I get those backpressure signals?
>>>
>>> I want to get the messages
>>> "Shuffle.Netty.Input.Buffers.inputQueueLength" [1] on my preAggregate
>>> operator in order to decide when I stop the pre-aggregation and flush
>>> tuples or when I keep pre aggregating. It is something like the "credit
>>> based control on the network stack" [2].
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#default-shuffle-service
>>> [2] https://www.youtube.com/watch?v=AbqatHF3tZI
>>>
>>> Thanks!
>>> Felipe
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez *
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>>
>>>
>

Re: How can I get the backpressure signals inside my function or operator?

Posted by Chesnay Schepler <ch...@apache.org>.
I don't think there is a truly sane way to do this.

I could envision a separate application triggering samples via the REST 
API, writing the results into kafka which your operator can read. This 
is probably the most reasonable solution I can come up with.

Any attempt at accessing the TaskExecutor or metrics from within the 
operator are inadvisable; you'd be encroaching into truly hacky territory.

You could also do your own backpressure sampling within your operator 
(separate thread within the operator executing the same sampling logic), 
but I don't know how easy it would be to re-use Flink code.


On 06/11/2019 13:40, Felipe Gutierrez wrote:
> Does anyone know in which metric I can rely on to know if a given 
> operator is activating the backpressure?
> Or how can I call the same java object that the Flink UI calls to give 
> me the ratio of backpressure?
>
> Thanks,
> Felipe
>
> *--*
> *-- Felipe Gutierrez*
> *-- skype: felipe.o.gutierrez
> *
> *--****_https://felipeogutierrez.blogspot.com_*
>
>
> On Tue, Nov 5, 2019 at 4:15 PM Felipe Gutierrez 
> <felipe.o.gutierrez@gmail.com <ma...@gmail.com>> 
> wrote:
>
>     Hi Zhijiang,
>
>     thanks for your reply. Yes, you understood correctly.
>     The fact that I cannot get
>     "Shuffle.Netty.Input.Buffers.inputQueueLength" on the operator
>     might be because of the way Flink runtime architecture was
>     designed. But I was wondering what kind of signal I can get. I
>     guess some backpressure message I could get because backpressure
>     works to slow down the upstream operators.
>
>     For example, I can see the ratio per sub-task on the web interface
>     [1]. It means the physical operators. Is there any message flowing
>     backward that I can get? Is there anything that makes me able to
>     not rely on some external storage?
>
>     [1]
>     https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html#sampling-threads
>     *--*
>     *-- Felipe Gutierrez*
>     *-- skype: felipe.o.gutierrez
>     *
>     *--****_https://felipeogutierrez.blogspot.com_*
>
>
>     On Tue, Nov 5, 2019 at 12:23 PM Zhijiang
>     <wangzhijiang999@aliyun.com <ma...@aliyun.com>>
>     wrote:
>
>         Hi Felipe,
>
>         That is an interesting idea to control the upstream's output
>         based on downstream's input.
>
>         If I understood correctly, the preAggregate operator would
>         trigger flush output while the reduce operator is idle/hungry.
>         In contrast, the preAggregate would continue aggregating data
>         in the case of back pressure.
>
>         I think this requirement is valid, but unfortunately I guess
>         you can not get the back pressure signal from the operator
>         level. AIK only the upper task level can get the input/output
>         state to decide whether to process or not.
>
>         If you want to get the reduce's metric of
>         `Shuffle.Netty.Input.Buffers.inputQueueLength` on preAggregate
>         side, you might rely on some external metric reporter to query
>         it if possible.
>
>         Best,
>         Zhijiang
>
>             ------------------------------------------------------------------
>             From:Felipe Gutierrez <felipe.o.gutierrez@gmail.com
>             <ma...@gmail.com>>
>             Send Time:2019 Nov. 5 (Tue.) 16:58
>             To:user <user@flink.apache.org <ma...@flink.apache.org>>
>             Subject:How can I get the backpressure signals inside my
>             function or operator?
>
>             Hi all,
>
>             let's say that I have a "source -> map .> preAggregrate ->
>             keyBy -> reduce -> sink" job and the reducer is sending
>             backpressure signals to the preAggregate, map and source
>             operator. How do I get those signals inside my operator's
>             implementation?
>             I guess inside the function is not possible. But if I have
>             my own operator implemented (preAggregate) can I get those
>             backpressure signals?
>
>             I want to get the messages
>             "Shuffle.Netty.Input.Buffers.inputQueueLength" [1] on my
>             preAggregate operator in order to decide when I stop the
>             pre-aggregation and flush tuples or when I keep pre
>             aggregating. It is something like the "credit based
>             control on the network stack" [2].
>
>             [1]
>             https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#default-shuffle-service
>             [2] https://www.youtube.com/watch?v=AbqatHF3tZI
>
>             Thanks!
>             Felipe
>             *--*
>             *-- Felipe Gutierrez*
>             *-- skype: felipe.o.gutierrez
>             *
>             *--***_https://felipeogutierrez.blogspot.com_
>
>


Re: How can I get the backpressure signals inside my function or operator?

Posted by Felipe Gutierrez <fe...@gmail.com>.
Does anyone know in which metric I can rely on to know if a given operator
is activating the backpressure?
Or how can I call the same java object that the Flink UI calls to give me
the ratio of backpressure?

Thanks,
Felipe

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Tue, Nov 5, 2019 at 4:15 PM Felipe Gutierrez <
felipe.o.gutierrez@gmail.com> wrote:

> Hi Zhijiang,
>
> thanks for your reply. Yes, you understood correctly.
> The fact that I cannot get "Shuffle.Netty.Input.Buffers.inputQueueLength"
> on the operator might be because of the way Flink runtime architecture was
> designed. But I was wondering what kind of signal I can get. I guess some
> backpressure message I could get because backpressure works to slow down
> the upstream operators.
>
> For example, I can see the ratio per sub-task on the web interface [1]. It
> means the physical operators. Is there any message flowing backward that I
> can get? Is there anything that makes me able to not rely on some external
> storage?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html#sampling-threads
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Tue, Nov 5, 2019 at 12:23 PM Zhijiang <wa...@aliyun.com>
> wrote:
>
>> Hi Felipe,
>>
>> That is an interesting idea to control the upstream's output based on
>> downstream's input.
>>
>> If I understood correctly, the preAggregate operator would trigger flush
>> output while the reduce operator is idle/hungry. In contrast, the preAggregate
>> would continue aggregating data in the case of back pressure.
>>
>> I think this requirement is valid, but unfortunately I guess you can not
>> get the back pressure signal from the operator level. AIK only the upper
>> task level can get the input/output state to decide whether to process or
>> not.
>>
>> If you want to get the reduce's metric of `Shuffle.Netty.Input.Buffers.inputQueueLength`
>> on preAggregate side, you might rely on some external metric reporter to
>> query it if possible.
>>
>> Best,
>> Zhijiang
>>
>> ------------------------------------------------------------------
>> From:Felipe Gutierrez <fe...@gmail.com>
>> Send Time:2019 Nov. 5 (Tue.) 16:58
>> To:user <us...@flink.apache.org>
>> Subject:How can I get the backpressure signals inside my function or
>> operator?
>>
>> Hi all,
>>
>> let's say that I have a "source -> map .> preAggregrate -> keyBy ->
>> reduce -> sink" job and the reducer is sending backpressure signals to the
>> preAggregate, map and source operator. How do I get those signals inside my
>> operator's implementation?
>> I guess inside the function is not possible. But if I have my own
>> operator implemented (preAggregate) can I get those backpressure signals?
>>
>> I want to get the messages "Shuffle.Netty.Input.Buffers.inputQueueLength"
>> [1] on my preAggregate operator in order to decide when I stop the
>> pre-aggregation and flush tuples or when I keep pre aggregating. It is
>> something like the "credit based control on the network stack" [2].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#default-shuffle-service
>> [2] https://www.youtube.com/watch?v=AbqatHF3tZI
>>
>> Thanks!
>> Felipe
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>>
>>

Re: How can I get the backpressure signals inside my function or operator?

Posted by Felipe Gutierrez <fe...@gmail.com>.
Hi Zhijiang,

thanks for your reply. Yes, you understood correctly.
The fact that I cannot get "Shuffle.Netty.Input.Buffers.inputQueueLength"
on the operator might be because of the way Flink runtime architecture was
designed. But I was wondering what kind of signal I can get. I guess some
backpressure message I could get because backpressure works to slow down
the upstream operators.

For example, I can see the ratio per sub-task on the web interface [1]. It
means the physical operators. Is there any message flowing backward that I
can get? Is there anything that makes me able to not rely on some external
storage?

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html#sampling-threads
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Tue, Nov 5, 2019 at 12:23 PM Zhijiang <wa...@aliyun.com> wrote:

> Hi Felipe,
>
> That is an interesting idea to control the upstream's output based on
> downstream's input.
>
> If I understood correctly, the preAggregate operator would trigger flush
> output while the reduce operator is idle/hungry. In contrast, the preAggregate
> would continue aggregating data in the case of back pressure.
>
> I think this requirement is valid, but unfortunately I guess you can not
> get the back pressure signal from the operator level. AIK only the upper
> task level can get the input/output state to decide whether to process or
> not.
>
> If you want to get the reduce's metric of `Shuffle.Netty.Input.Buffers.inputQueueLength`
> on preAggregate side, you might rely on some external metric reporter to
> query it if possible.
>
> Best,
> Zhijiang
>
> ------------------------------------------------------------------
> From:Felipe Gutierrez <fe...@gmail.com>
> Send Time:2019 Nov. 5 (Tue.) 16:58
> To:user <us...@flink.apache.org>
> Subject:How can I get the backpressure signals inside my function or
> operator?
>
> Hi all,
>
> let's say that I have a "source -> map .> preAggregrate -> keyBy -> reduce
> -> sink" job and the reducer is sending backpressure signals to the
> preAggregate, map and source operator. How do I get those signals inside my
> operator's implementation?
> I guess inside the function is not possible. But if I have my own operator
> implemented (preAggregate) can I get those backpressure signals?
>
> I want to get the messages "Shuffle.Netty.Input.Buffers.inputQueueLength"
> [1] on my preAggregate operator in order to decide when I stop the
> pre-aggregation and flush tuples or when I keep pre aggregating. It is
> something like the "credit based control on the network stack" [2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#default-shuffle-service
> [2] https://www.youtube.com/watch?v=AbqatHF3tZI
>
> Thanks!
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
>

Re: How can I get the backpressure signals inside my function or operator?

Posted by Zhijiang <wa...@aliyun.com>.
Hi Felipe,

That is an interesting idea to control the upstream's output based on downstream's input.

 If I understood correctly, the preAggregate operator would trigger flush output while the reduce operator is idle/hungry. In contrast, the preAggregate would continue aggregating data in the case of back pressure.

I think this requirement is valid, but unfortunately I guess you can not get the back pressure signal from the operator level. AIK only the upper task level can get the input/output state to decide whether to process or not.

If you want to get the reduce's metric of `Shuffle.Netty.Input.Buffers.inputQueueLength` on preAggregate side, you might rely on some external metric reporter to query it if possible.

Best,
Zhijiang


------------------------------------------------------------------
From:Felipe Gutierrez <fe...@gmail.com>
Send Time:2019 Nov. 5 (Tue.) 16:58
To:user <us...@flink.apache.org>
Subject:How can I get the backpressure signals inside my function or operator?

Hi all,

let's say that I have a "source -> map .> preAggregrate -> keyBy -> reduce -> sink" job and the reducer is sending backpressure signals to the preAggregate, map and source operator. How do I get those signals inside my operator's implementation?
I guess inside the function is not possible. But if I have my own operator implemented (preAggregate) can I get those backpressure signals?

I want to get the messages "Shuffle.Netty.Input.Buffers.inputQueueLength" [1] on my preAggregate operator in order to decide when I stop the pre-aggregation and flush tuples or when I keep pre aggregating. It is something like the "credit based control on the network stack" [2].

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#default-shuffle-service
[2] https://www.youtube.com/watch?v=AbqatHF3tZI

Thanks!
Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez

-- https://felipeogutierrez.blogspot.com