You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Paolo Cristofanelli <cr...@gmail.com> on 2017/07/17 10:42:49 UTC

Latency Measurement

Hi,

I would like to understand how to measure the latency of a record.
I have set up a simple project with a Kafka consumer that reads from a
topic and performs a simple map (with a thread sleep inside).

In order to measure the latency of this mapper I have added
env.getConfig().setLatencyTrackingInterval(10);

After that, I was planning to access the latency through the webUI
interface but the related graph does not show any values.
I do not understand why. I was thinking that I in the graph I should
observe at least the sleep duration.

I also have another question:

I am using a count window, aggregating every 100 input records and then I
perform a map. I want to see the latency as the difference between the time
at which the output record is emitted and the arrival time of the earliest
input record.

For example, the first value arrives at x. After x +5 I all the 100 values
arrived and the system can aggregate them. Now I perform the map operation
and we emit the output record at time x+15.
I would like to obtain 15 as latency.
Do you have any suggestion on how to proceed?

Thanks for your time,
Paolo Cristofanelli

Re: Latency Measurement

Posted by Chesnay Schepler <ch...@apache.org>.
I originally meant startNewChain(), but disableChaining() should work too.

Can you rerun the job with the logging level set to DEBUG, and check for 
any message from org.apache.flink.runtime.metrics?

Also looping in Robert, maybe he has an idea.

On 17.07.2017 14:23, Paolo Cristofanelli wrote:
> Hi Chesnay,
>
> thanks for your answer. I have not found the method createNewChain(), 
> I used instead disableChaining(), but with no effect:
>
>          DataStream<String> stream = env.addSource(
>
>         new FlinkKafkaConsumer08<>(
>
>         "MyTopic", new SimpleStringSchema(), properties) );
>
>
>            stream.map( new ConsumerMap()).disableChaining();
>
>
>            env.execute();
>
>
>
> Best Regards,
> Paolo
>
> On 17 July 2017 at 13:10, Chesnay Schepler <chesnay@apache.org 
> <ma...@apache.org>> wrote:
>
>     Hello,
>
>     As for 1), my suspicion is that this is caused by chaining. If the
>     map function is chained to the kafka source then the latency
>     markers are always immediately forwarded, regardless of what your
>     map function is doing.
>     If the map function is indeed chained to the source, could you try
>     again after disabling the chain by calling
>     `X.map(...).createNewChain()` and report back?
>
>     As for 2), I don't think this is possible right now.
>
>     Regards,
>     Chesnay
>
>
>     On 17.07.2017 12:42, Paolo Cristofanelli wrote:
>
>         Hi,
>
>         I would like to understand how to measure the latency of a record.
>         I have set up a simple project with a Kafka consumer that
>         reads from a topic and performs a simple map (with a thread
>         sleep inside).
>
>         In order to measure the latency of this mapper I have added
>         env.getConfig().setLatencyTrackingInterval(10);
>
>         After that, I was planning to access the latency through the
>         webUI interface but the related graph does not show any values.
>         I do not understand why. I was thinking that I in the graph I
>         should observe at least the sleep duration.
>
>         I also have another question:
>
>         I am using a count window, aggregating every 100 input records
>         and then I perform a map. I want to see the latency as the
>         difference between the time at which the output record is
>         emitted and the arrival time of the earliest input record.
>
>         For example, the first value arrives at x. After x +5 I all
>         the 100 values arrived and the system can aggregate them. Now
>         I perform the map operation and we emit the output record at
>         time x+15.
>         I would like to obtain 15 as latency.
>         Do you have any suggestion on how to proceed?
>
>         Thanks for your time,
>         Paolo Cristofanelli
>
>
>
>


Re: Latency Measurement

Posted by Chesnay Schepler <ch...@apache.org>.
Hello,

As for 1), my suspicion is that this is caused by chaining. If the map 
function is chained to the kafka source then the latency markers are 
always immediately forwarded, regardless of what your map function is doing.
If the map function is indeed chained to the source, could you try again 
after disabling the chain by calling `X.map(...).createNewChain()` and 
report back?

As for 2), I don't think this is possible right now.

Regards,
Chesnay

On 17.07.2017 12:42, Paolo Cristofanelli wrote:
> Hi,
>
> I would like to understand how to measure the latency of a record.
> I have set up a simple project with a Kafka consumer that reads from a 
> topic and performs a simple map (with a thread sleep inside).
>
> In order to measure the latency of this mapper I have added 
> env.getConfig().setLatencyTrackingInterval(10);
>
> After that, I was planning to access the latency through the webUI 
> interface but the related graph does not show any values.
> I do not understand why. I was thinking that I in the graph I should 
> observe at least the sleep duration.
>
> I also have another question:
>
> I am using a count window, aggregating every 100 input records and 
> then I perform a map. I want to see the latency as the difference 
> between the time at which the output record is emitted and the arrival 
> time of the earliest input record.
>
> For example, the first value arrives at x. After x +5 I all the 100 
> values arrived and the system can aggregate them. Now I perform the 
> map operation and we emit the output record at time x+15.
> I would like to obtain 15 as latency.
> Do you have any suggestion on how to proceed?
>
> Thanks for your time,
> Paolo Cristofanelli