You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Luis Alves <lm...@gmail.com> on 2018/03/28 09:44:18 UTC

Load Shedding

Hi,

As part of a project that I'm developing, I'm extending Flink 1.2 to
support load shedding. I'm doing some performance tests to check the
performance impact of my changes compared to Flink 1.2 release.

From the results that I'm getting, I can see that load shedding is working
and that incoming events are being dropped (the lag in the input Kafka
topic also remains ~0).

But when I look at the latency in Flink, it seems that when load shedding
triggers, the latency starts  growing to values above 5 seconds (I don't
see the same behavior on Flink 1.2. release). Before load shedding
triggers, the latency remains similar.

Looking at the git diff with the changes that I did on the application
runtime side, there's only one that is in the critical path of the
processing pipeline. In the RecordWriter.emit
<https://github.com/apache/flink/blob/066b66dd37030bb94bd179d7fb2280876be038c5/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java#L105>
I simply add a condition that randomly skips the step of sending the record
to the target (sendToTarget) with a given probability (dropProbability <
random.nextDouble()).

Does adding those random drops have side effects on some other component in
Flink, causing the latencies to increase?

Thanks,
Luís Alves

Re: Load Shedding

Posted by Luis Alves <lm...@gmail.com>.
I could, but the objective is to have minimal changes on the user level.

Luís Alves

2018-03-28 16:45 GMT+01:00 Piotr Nowojski <pi...@data-artisans.com>:

> Yes, they are using randomEmit, so if you didn’t add this randomised
> records dropping in it, my remark would be invalid (and consistent with
> what Chesnay wrote).
>
> Besides questions asked by Chesnay, wouldn’t it be safer to implement
> records shedding on a user level in a form of randomly filtering operator?
>
> Piotrek
>
> > On 28 Mar 2018, at 15:49, Luis Alves <lm...@gmail.com> wrote:
> >
> > @Chesnay I tried both approaches, using the latency metric and manually
> by
> > adding a timestamp to each record.
> > @Piotr I can try to do the random drops in the RecordWriterOutput, but
> > don't the latency markers use the randomEmit method instead of emit?
> >
> > 2018-03-28 14:26 GMT+01:00 Chesnay Schepler <ch...@apache.org>:
> >
> >> My first instinct were latency markers as well, but AFAIK latency
> markers
> >> are self-contained; they contain the start timestamp from the source
> and we
> >> just measure the diff at each task. Thus, if the marker is dropped it
> >> shouldn't be visible in increased latency metrics, but they should just
> not
> >> be updated instead.
> >>
> >> How do you measure the latency to being with? Do you use the latency
> >> metric or do you measure it manually?
> >> If it is done manually couldn't this be due by dropping watermarks,
> >> causing window operations to take longer than expected?
> >>
> >>
> >> On 28.03.2018 14:39, Piotr Nowojski wrote:
> >>
> >>> Hi,
> >>>
> >>> If you have modified RecordWriter#randomEmit then maybe (probably?) the
> >>> reason is that you are accidentally skipping LatencyMarkers along side
> >>> records. You can track the code path of emitting LatencyMarkers from
> >>> Output#emitLatencyMarker.
> >>>
> >>> I haven’t thought that through, but maybe you should implement your
> >>> record shedding on a level of RecordWriterOutput (or all
> implementations of
> >>> the org.apache.flink.streaming.api.operators.Output?), because it’s
> >>> easier there to differentiate between normal records and
> LatencyMarkers.
> >>>
> >>> Piotrek
> >>>
> >>> On 28 Mar 2018, at 11:44, Luis Alves <lm...@gmail.com> wrote:
> >>>>
> >>>> Hi,
> >>>>
> >>>> As part of a project that I'm developing, I'm extending Flink 1.2 to
> >>>> support load shedding. I'm doing some performance tests to check the
> >>>> performance impact of my changes compared to Flink 1.2 release.
> >>>>
> >>>> From the results that I'm getting, I can see that load shedding is
> >>>> working
> >>>> and that incoming events are being dropped (the lag in the input Kafka
> >>>> topic also remains ~0).
> >>>>
> >>>> But when I look at the latency in Flink, it seems that when load
> shedding
> >>>> triggers, the latency starts  growing to values above 5 seconds (I
> don't
> >>>> see the same behavior on Flink 1.2. release). Before load shedding
> >>>> triggers, the latency remains similar.
> >>>>
> >>>> Looking at the git diff with the changes that I did on the application
> >>>> runtime side, there's only one that is in the critical path of the
> >>>> processing pipeline. In the RecordWriter.emit
> >>>> <https://github.com/apache/flink/blob/066b66dd37030bb94bd179
> >>>> d7fb2280876be038c5/flink-runtime/src/main/java/org/
> >>>> apache/flink/runtime/io/network/api/writer/RecordWriter.java#L105>
> >>>> I simply add a condition that randomly skips the step of sending the
> >>>> record
> >>>> to the target (sendToTarget) with a given probability
> (dropProbability <
> >>>> random.nextDouble()).
> >>>>
> >>>> Does adding those random drops have side effects on some other
> component
> >>>> in
> >>>> Flink, causing the latencies to increase?
> >>>>
> >>>> Thanks,
> >>>> Luís Alves
> >>>>
> >>>
> >>>
> >>
>
>

Re: Load Shedding

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Yes, they are using randomEmit, so if you didn’t add this randomised records dropping in it, my remark would be invalid (and consistent with what Chesnay wrote).

Besides questions asked by Chesnay, wouldn’t it be safer to implement records shedding on a user level in a form of randomly filtering operator?

Piotrek

> On 28 Mar 2018, at 15:49, Luis Alves <lm...@gmail.com> wrote:
> 
> @Chesnay I tried both approaches, using the latency metric and manually by
> adding a timestamp to each record.
> @Piotr I can try to do the random drops in the RecordWriterOutput, but
> don't the latency markers use the randomEmit method instead of emit?
> 
> 2018-03-28 14:26 GMT+01:00 Chesnay Schepler <ch...@apache.org>:
> 
>> My first instinct were latency markers as well, but AFAIK latency markers
>> are self-contained; they contain the start timestamp from the source and we
>> just measure the diff at each task. Thus, if the marker is dropped it
>> shouldn't be visible in increased latency metrics, but they should just not
>> be updated instead.
>> 
>> How do you measure the latency to being with? Do you use the latency
>> metric or do you measure it manually?
>> If it is done manually couldn't this be due by dropping watermarks,
>> causing window operations to take longer than expected?
>> 
>> 
>> On 28.03.2018 14:39, Piotr Nowojski wrote:
>> 
>>> Hi,
>>> 
>>> If you have modified RecordWriter#randomEmit then maybe (probably?) the
>>> reason is that you are accidentally skipping LatencyMarkers along side
>>> records. You can track the code path of emitting LatencyMarkers from
>>> Output#emitLatencyMarker.
>>> 
>>> I haven’t thought that through, but maybe you should implement your
>>> record shedding on a level of RecordWriterOutput (or all implementations of
>>> the org.apache.flink.streaming.api.operators.Output?), because it’s
>>> easier there to differentiate between normal records and LatencyMarkers.
>>> 
>>> Piotrek
>>> 
>>> On 28 Mar 2018, at 11:44, Luis Alves <lm...@gmail.com> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> As part of a project that I'm developing, I'm extending Flink 1.2 to
>>>> support load shedding. I'm doing some performance tests to check the
>>>> performance impact of my changes compared to Flink 1.2 release.
>>>> 
>>>> From the results that I'm getting, I can see that load shedding is
>>>> working
>>>> and that incoming events are being dropped (the lag in the input Kafka
>>>> topic also remains ~0).
>>>> 
>>>> But when I look at the latency in Flink, it seems that when load shedding
>>>> triggers, the latency starts  growing to values above 5 seconds (I don't
>>>> see the same behavior on Flink 1.2. release). Before load shedding
>>>> triggers, the latency remains similar.
>>>> 
>>>> Looking at the git diff with the changes that I did on the application
>>>> runtime side, there's only one that is in the critical path of the
>>>> processing pipeline. In the RecordWriter.emit
>>>> <https://github.com/apache/flink/blob/066b66dd37030bb94bd179
>>>> d7fb2280876be038c5/flink-runtime/src/main/java/org/
>>>> apache/flink/runtime/io/network/api/writer/RecordWriter.java#L105>
>>>> I simply add a condition that randomly skips the step of sending the
>>>> record
>>>> to the target (sendToTarget) with a given probability (dropProbability <
>>>> random.nextDouble()).
>>>> 
>>>> Does adding those random drops have side effects on some other component
>>>> in
>>>> Flink, causing the latencies to increase?
>>>> 
>>>> Thanks,
>>>> Luís Alves
>>>> 
>>> 
>>> 
>> 


Re: Load Shedding

Posted by Luis Alves <lm...@gmail.com>.
@Chesnay I tried both approaches, using the latency metric and manually by
adding a timestamp to each record.
@Piotr I can try to do the random drops in the RecordWriterOutput, but
don't the latency markers use the randomEmit method instead of emit?

2018-03-28 14:26 GMT+01:00 Chesnay Schepler <ch...@apache.org>:

> My first instinct were latency markers as well, but AFAIK latency markers
> are self-contained; they contain the start timestamp from the source and we
> just measure the diff at each task. Thus, if the marker is dropped it
> shouldn't be visible in increased latency metrics, but they should just not
> be updated instead.
>
> How do you measure the latency to being with? Do you use the latency
> metric or do you measure it manually?
> If it is done manually couldn't this be due by dropping watermarks,
> causing window operations to take longer than expected?
>
>
> On 28.03.2018 14:39, Piotr Nowojski wrote:
>
>> Hi,
>>
>> If you have modified RecordWriter#randomEmit then maybe (probably?) the
>> reason is that you are accidentally skipping LatencyMarkers along side
>> records. You can track the code path of emitting LatencyMarkers from
>> Output#emitLatencyMarker.
>>
>> I haven’t thought that through, but maybe you should implement your
>> record shedding on a level of RecordWriterOutput (or all implementations of
>> the org.apache.flink.streaming.api.operators.Output?), because it’s
>> easier there to differentiate between normal records and LatencyMarkers.
>>
>> Piotrek
>>
>> On 28 Mar 2018, at 11:44, Luis Alves <lm...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> As part of a project that I'm developing, I'm extending Flink 1.2 to
>>> support load shedding. I'm doing some performance tests to check the
>>> performance impact of my changes compared to Flink 1.2 release.
>>>
>>>  From the results that I'm getting, I can see that load shedding is
>>> working
>>> and that incoming events are being dropped (the lag in the input Kafka
>>> topic also remains ~0).
>>>
>>> But when I look at the latency in Flink, it seems that when load shedding
>>> triggers, the latency starts  growing to values above 5 seconds (I don't
>>> see the same behavior on Flink 1.2. release). Before load shedding
>>> triggers, the latency remains similar.
>>>
>>> Looking at the git diff with the changes that I did on the application
>>> runtime side, there's only one that is in the critical path of the
>>> processing pipeline. In the RecordWriter.emit
>>> <https://github.com/apache/flink/blob/066b66dd37030bb94bd179
>>> d7fb2280876be038c5/flink-runtime/src/main/java/org/
>>> apache/flink/runtime/io/network/api/writer/RecordWriter.java#L105>
>>> I simply add a condition that randomly skips the step of sending the
>>> record
>>> to the target (sendToTarget) with a given probability (dropProbability <
>>> random.nextDouble()).
>>>
>>> Does adding those random drops have side effects on some other component
>>> in
>>> Flink, causing the latencies to increase?
>>>
>>> Thanks,
>>> Luís Alves
>>>
>>
>>
>

Re: Load Shedding

Posted by Chesnay Schepler <ch...@apache.org>.
My first instinct were latency markers as well, but AFAIK latency 
markers are self-contained; they contain the start timestamp from the 
source and we just measure the diff at each task. Thus, if the marker is 
dropped it shouldn't be visible in increased latency metrics, but they 
should just not be updated instead.

How do you measure the latency to being with? Do you use the latency 
metric or do you measure it manually?
If it is done manually couldn't this be due by dropping watermarks, 
causing window operations to take longer than expected?

On 28.03.2018 14:39, Piotr Nowojski wrote:
> Hi,
>
> If you have modified RecordWriter#randomEmit then maybe (probably?) the reason is that you are accidentally skipping LatencyMarkers along side records. You can track the code path of emitting LatencyMarkers from Output#emitLatencyMarker.
>
> I haven’t thought that through, but maybe you should implement your record shedding on a level of RecordWriterOutput (or all implementations of the org.apache.flink.streaming.api.operators.Output?), because it’s easier there to differentiate between normal records and LatencyMarkers.
>
> Piotrek
>
>> On 28 Mar 2018, at 11:44, Luis Alves <lm...@gmail.com> wrote:
>>
>> Hi,
>>
>> As part of a project that I'm developing, I'm extending Flink 1.2 to
>> support load shedding. I'm doing some performance tests to check the
>> performance impact of my changes compared to Flink 1.2 release.
>>
>>  From the results that I'm getting, I can see that load shedding is working
>> and that incoming events are being dropped (the lag in the input Kafka
>> topic also remains ~0).
>>
>> But when I look at the latency in Flink, it seems that when load shedding
>> triggers, the latency starts  growing to values above 5 seconds (I don't
>> see the same behavior on Flink 1.2. release). Before load shedding
>> triggers, the latency remains similar.
>>
>> Looking at the git diff with the changes that I did on the application
>> runtime side, there's only one that is in the critical path of the
>> processing pipeline. In the RecordWriter.emit
>> <https://github.com/apache/flink/blob/066b66dd37030bb94bd179d7fb2280876be038c5/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java#L105>
>> I simply add a condition that randomly skips the step of sending the record
>> to the target (sendToTarget) with a given probability (dropProbability <
>> random.nextDouble()).
>>
>> Does adding those random drops have side effects on some other component in
>> Flink, causing the latencies to increase?
>>
>> Thanks,
>> Luís Alves
>


Re: Load Shedding

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

If you have modified RecordWriter#randomEmit then maybe (probably?) the reason is that you are accidentally skipping LatencyMarkers along side records. You can track the code path of emitting LatencyMarkers from Output#emitLatencyMarker.

I haven’t thought that through, but maybe you should implement your record shedding on a level of RecordWriterOutput (or all implementations of the org.apache.flink.streaming.api.operators.Output?), because it’s easier there to differentiate between normal records and LatencyMarkers.

Piotrek

> On 28 Mar 2018, at 11:44, Luis Alves <lm...@gmail.com> wrote:
> 
> Hi,
> 
> As part of a project that I'm developing, I'm extending Flink 1.2 to
> support load shedding. I'm doing some performance tests to check the
> performance impact of my changes compared to Flink 1.2 release.
> 
> From the results that I'm getting, I can see that load shedding is working
> and that incoming events are being dropped (the lag in the input Kafka
> topic also remains ~0).
> 
> But when I look at the latency in Flink, it seems that when load shedding
> triggers, the latency starts  growing to values above 5 seconds (I don't
> see the same behavior on Flink 1.2. release). Before load shedding
> triggers, the latency remains similar.
> 
> Looking at the git diff with the changes that I did on the application
> runtime side, there's only one that is in the critical path of the
> processing pipeline. In the RecordWriter.emit
> <https://github.com/apache/flink/blob/066b66dd37030bb94bd179d7fb2280876be038c5/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java#L105>
> I simply add a condition that randomly skips the step of sending the record
> to the target (sendToTarget) with a given probability (dropProbability <
> random.nextDouble()).
> 
> Does adding those random drops have side effects on some other component in
> Flink, causing the latencies to increase?
> 
> Thanks,
> Luís Alves