You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Hector Geraldino (Jira)" <ji...@apache.org> on 2023/02/03 04:55:00 UTC

[jira] [Assigned] (KAFKA-14659) source-record-write-[rate|total] metrics include filtered records

     [ https://issues.apache.org/jira/browse/KAFKA-14659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hector Geraldino reassigned KAFKA-14659:
----------------------------------------

    Assignee: Hector Geraldino

> source-record-write-[rate|total] metrics include filtered records
> -----------------------------------------------------------------
>
>                 Key: KAFKA-14659
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14659
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>            Reporter: Chris Beard
>            Assignee: Hector Geraldino
>            Priority: Minor
>
> Source tasks in Kafka connect offer two sets of metrics (documented in [ConnectMetricsRegistry.java|https://github.com/apache/kafka/blob/72cfc994f5675be349d4494ece3528efed290651/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java#L173-L191]):
> ||Metric||Description||
> |source-record-poll-rate|The average per-second number of records produced/polled (before transformation) by this task belonging to the named source connector in this worker.|
> |source-record-write-rate|The average per-second number of records output from the transformations and written to Kafka for this task belonging to the named source connector in this worker. This is after transformations are applied and excludes any records filtered out by the transformations.|
> There are also corresponding "-total" metrics that capture the total number of records polled and written for the metrics above, respectively.
> In short, the "poll" metrics capture the number of messages sourced pre-transformation/filtering, and the "write" metrics should capture the number of messages ultimately written to Kafka post-transformation/filtering. However, the implementation of the {{source-record-write-*}}  metrics _includes_ records filtered out by transformations (and also records that result in produce failures with the config {{{}errors.tolerance=all{}}}).
> h3. Details
> In [AbstractWorkerSourceTask.java|https://github.com/apache/kafka/blob/a382acd31d1b53cd8695ff9488977566083540b1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L389-L397], each source record is passed through the transformation chain where it is potentially filtered out, checked to see if it was in fact filtered out, and if so it is accounted for in the internal metrics via {{{}counter.skipRecord(){}}}.
> {code:java}
> for (final SourceRecord preTransformRecord : toSend) {         
>     retryWithToleranceOperator.sourceRecord(preTransformRecord);
>     final SourceRecord record = transformationChain.apply(preTransformRecord);            
>     final ProducerRecord<byte[], byte[]> producerRecord = convertTransformedRecord(record);
>     if (producerRecord == null || retryWithToleranceOperator.failed()) {                
>         counter.skipRecord();
>         recordDropped(preTransformRecord);
>         continue;
>     }
>     ...
> {code}
> {{SourceRecordWriteCounter.skipRecord()}} is implemented as follows:
> {code:java}
>     ....
>     public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup metricsGroup) {
>         assert batchSize > 0;
>         assert metricsGroup != null;
>         this.batchSize = batchSize;
>         counter = batchSize;
>         this.metricsGroup = metricsGroup;
>     }
>     public void skipRecord() {
>         if (counter > 0 && --counter == 0) {
>             finishedAllWrites();
>         }
>     }
>     ....
>     private void finishedAllWrites() {
>         if (!completed) {
>             metricsGroup.recordWrite(batchSize - counter);
>             completed = true;
>         }
>     }
> {code}
> For example: If a batch starts with 100 records, {{batchSize}} and {{counter}} will both be initialized to 100. If all 100 records get filtered out, {{counter}} will be decremented 100 times, and {{{}finishedAllWrites(){}}}will record the value 100 to the underlying {{source-record-write-*}}  metrics rather than 0, the correct value according to the documentation for these metrics.
> h3. Solutions
> Assuming the documentation correctly captures the intent of the {{source-record-write-*}}  metrics, it seems reasonable to fix these metrics such that filtered records do not get counted.
> It may also be useful to add additional metrics to capture the rate and total number of records filtered out by transformations, which would require a KIP.
> I'm not sure what the best way of accounting for produce failures in the case of {{errors.tolerance=all}} is yet. Maybe these failures deserve their own new metrics?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)