You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Atul Jain (Jira)" <ji...@apache.org> on 2023/01/05 18:03:00 UTC
[jira] [Updated] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics
[ https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Atul Jain updated KAFKA-14597:
------------------------------
Description:
I was following this KIP documentation ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams]) and kafka streams documentation ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end]) . Based on these documentations , the *record-e2e-latency-max* should monitor the full end to end latencies, which includes both *consumption latencies* and {*}processing delays{*}.
However, based on my observations , record-e2e-latency-max seems to be only measuring the consumption latencies. processing delays can be measured using *process-latency-max* .I am checking all this using a simple topology consisting of source, processors and sink (code added). I have added some sleep time (of 3 seconds) in one of the processors to ensure some delays in the processing logic. These delays are not getting accounted in the record-e2e-latency-max but are accounted in process-latency-max. process-latency-max was observed to be 3002 ms which accounts for sleep time of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, which does not account for 3 seconds of sleep time.
Code describing my topology:
{code:java}
static Topology buildTopology(String inputTopic, String outputTopic) {
log.info("Input topic: " + inputTopic + " and output topic: " + outputTopic);
Serde<String> stringSerde = Serdes.String();
StreamsBuilder builder = new StreamsBuilder();
builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde))
.peek((k,v) -> log.info("Observed event: key" + k + " value: " + v))
.mapValues(s -> {
try {
System.out.println("sleeping for 3 seconds");
Thread.sleep(3000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
return s.toUpperCase();
})
.peek((k,v) -> log.info("Transformed event: key" + k + " value: " + v))
.to(outputTopic, Produced.with(stringSerde, stringSerde));
return builder.build();
} {code}
was:
I was following this KIP documentation ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams]) and kafka streams documentation ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end]) . Based on these documentations , the *record-e2e-latency-max* should monitor the full end to end latencies, which includes both *consumption latencies* and {*}processing delays{*}.
However, based on my observations , record-e2e-latency-max seems to be only measuring the consumption latencies. processing delays can be measured using *process-latency-max* .I am checking all this using a simple topology consisting of source, processors and sink (code added). I have added some sleep time (of 3 seconds) in one of the processors to ensure some delays in the processing logic. These delays are not getting accounted in the record-e2e-latency-max but are accounted in process-latency-max. process-latency-max was observed to be 3002 ms which accounts for sleep time of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, which does not account for 3 seconds of sleep time.
Code describing my topology:
static Topology buildTopology(String inputTopic, String outputTopic) \{
log.info("Input topic: " + inputTopic + " and output topic: " + outputTopic);
Serde<String> stringSerde = Serdes.String();
StreamsBuilder builder = new StreamsBuilder();
builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde))
.peek((k,v) -> log.info("Observed event: key" + k + " value: " + v))
.mapValues(s -> {
try {
System.out.println("sleeping for 3 seconds");
Thread.sleep(3000);
}
catch (InterruptedException e) \{
e.printStackTrace();
}
return s.toUpperCase();
})
.peek((k,v) -> log.info("Transformed event: key" + k + " value: " + v))
.to(outputTopic, Produced.with(stringSerde, stringSerde));
return builder.build();
}
> [Streams] record-e2e-latency-max is not reporting correct metrics
> ------------------------------------------------------------------
>
> Key: KAFKA-14597
> URL: https://issues.apache.org/jira/browse/KAFKA-14597
> Project: Kafka
> Issue Type: Bug
> Reporter: Atul Jain
> Priority: Major
> Attachments: process-latency-max.jpg, record-e2e-latency-max.jpg
>
>
> I was following this KIP documentation ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams]) and kafka streams documentation ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end]) . Based on these documentations , the *record-e2e-latency-max* should monitor the full end to end latencies, which includes both *consumption latencies* and {*}processing delays{*}.
> However, based on my observations , record-e2e-latency-max seems to be only measuring the consumption latencies. processing delays can be measured using *process-latency-max* .I am checking all this using a simple topology consisting of source, processors and sink (code added). I have added some sleep time (of 3 seconds) in one of the processors to ensure some delays in the processing logic. These delays are not getting accounted in the record-e2e-latency-max but are accounted in process-latency-max. process-latency-max was observed to be 3002 ms which accounts for sleep time of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, which does not account for 3 seconds of sleep time.
>
> Code describing my topology:
> {code:java}
> static Topology buildTopology(String inputTopic, String outputTopic) {
> log.info("Input topic: " + inputTopic + " and output topic: " + outputTopic);
> Serde<String> stringSerde = Serdes.String();
> StreamsBuilder builder = new StreamsBuilder();
> builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde))
> .peek((k,v) -> log.info("Observed event: key" + k + " value: " + v))
> .mapValues(s -> {
> try {
> System.out.println("sleeping for 3 seconds");
> Thread.sleep(3000);
> }
> catch (InterruptedException e) {
> e.printStackTrace();
> }
> return s.toUpperCase();
> })
> .peek((k,v) -> log.info("Transformed event: key" + k + " value: " + v))
> .to(outputTopic, Produced.with(stringSerde, stringSerde));
> return builder.build();
> } {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)