You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Valentina Predtechenskaya (Jira)" <ji...@apache.org> on 2022/08/03 11:07:00 UTC

[jira] [Created] (FLINK-28790) Incorrect KafkaProducer metrics initialization

Valentina Predtechenskaya created FLINK-28790:
-------------------------------------------------

             Summary: Incorrect KafkaProducer metrics initialization
                 Key: FLINK-28790
                 URL: https://issues.apache.org/jira/browse/FLINK-28790
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: 1.15.1, 1.14.4
            Reporter: Valentina Predtechenskaya


Problem

KafkaProducer Flink metrics have unpredictable behavior because of concurrent initialization of broker's and topic's metrics.

Reproducing

Firstly we found the problem with our Flink cluster: metric KafkaProducer.outgoing-byte-rate was periodically missing (was equals zero or near zero) on several subtasks, in the same time other subtasks was fine with this metric. Actual outgoing rate was the same on different subtasks, it was clear from, for example, KafkaProducer.records-send-rate metric, which was ok on every subtask, problem 100% was with metric itself.

After long investigation we found the root-cause of this behavior:
 
* KafkaWriter creates an instance of FlinkKafkaInternalProducer and then [initializes|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L327-L330] metric wrappers over existing KafkaProducer metrics (gauges)
* KafkaProducer itself in the constructor [creates Sender|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L458-L460] to access brokers, starts a thread (kafka-producer-network-thread) and run Sender in this separate thread
* After starting the Sender, metrics connected with topics and brokers register for some time. If they register quickly, KafkaWriter will see them before the end of initialization and these metrics will be wrapped as flink gauges. Otherwise, they will not.
* [Some KafkaProducer metrics|https://docs.confluent.io/platform/current/kafka/monitoring.html#producer-metrics] from producer and from broker has same names - for example, outgoing-byte-rate
* In case if two metrics has same name, Flink KafkaWriter [rewrites|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L359-L360] metric in wrapper

So, to reproduce this bug it's enough to run any job with Kafka Sink and to look at the KafkaProducer metrics, some of them will be absent (broker's or topic's) or some of them will be rewritten (like outgoing-byte-rate in the example).

I suppose there is at least two ways to fix it:
1. Add tag (producer-metric, producer-node-metric, etc.) to Flinks metrics name
2. Use only metrics with tag=producer-metrics, ignore any another tags - without considering broker's and topic's metrics

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)