You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Robert Metzger (Jira)" <ji...@apache.org> on 2022/10/31 07:18:00 UTC

[jira] [Commented] (FLINK-28790) Incorrect KafkaProducer metrics initialization

    [ https://issues.apache.org/jira/browse/FLINK-28790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626441#comment-17626441 ] 

Robert Metzger commented on FLINK-28790:
----------------------------------------

Thanks for reporting this issue. 
Correct me if I'm wrong, but my gut feeling is that fix 1. is nicer, as it exposes all metrics, however, it will be a breaking change in the next Flink release, because some metrics will have a new name? (or am I wrong because these are just additional tags?)



> Incorrect KafkaProducer metrics initialization
> ----------------------------------------------
>
>                 Key: FLINK-28790
>                 URL: https://issues.apache.org/jira/browse/FLINK-28790
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.14.4, 1.15.1
>            Reporter: Valentina Predtechenskaya
>            Priority: Major
>
> Problem
> KafkaProducer Flink metrics have unpredictable behavior because of concurrent initialization of broker's and topic's metrics.
> Reproducing
> Firstly we found the problem with our Flink cluster: metric KafkaProducer.outgoing-byte-rate was periodically missing (was equals zero or near zero) on several subtasks, in the same time other subtasks was fine with this metric. Actual outgoing rate was the same on different subtasks, it was clear from, for example, KafkaProducer.records-send-rate metric, which was ok on every subtask, problem 100% was with metric itself.
> After long investigation we found the root-cause of this behavior:
>  
> * KafkaWriter creates an instance of FlinkKafkaInternalProducer and then [initializes|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L327-L330] metric wrappers over existing KafkaProducer metrics (gauges)
> * KafkaProducer itself in the constructor [creates Sender|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L458-L460] to access brokers, starts a thread (kafka-producer-network-thread) and run Sender in this separate thread
> * After starting the Sender, metrics connected with topics and brokers register for some time. If they register quickly, KafkaWriter will see them before the end of initialization and these metrics will be wrapped as flink gauges. Otherwise, they will not.
> * [Some KafkaProducer metrics|https://docs.confluent.io/platform/current/kafka/monitoring.html#producer-metrics] from producer and from broker has same names - for example, outgoing-byte-rate
> * In case if two metrics has same name, Flink KafkaWriter [rewrites|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L359-L360] metric in wrapper
> So, to reproduce this bug it's enough to run any job with Kafka Sink and to look at the KafkaProducer metrics, some of them will be absent (broker's or topic's) or some of them will be rewritten (like outgoing-byte-rate in the example).
> I suppose there is at least two ways to fix it:
> 1. Add tag (producer-metric, producer-node-metric, etc.) to Flinks metrics name
> 2. Use only metrics with tag=producer-metrics, ignore any another tags - without considering broker's and topic's metrics
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)