You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Yash Mayya (Jira)" <ji...@apache.org> on 2023/04/27 09:56:00 UTC

[jira] [Commented] (KAFKA-14947) Duplicate records are getting created in the topic.

    [ https://issues.apache.org/jira/browse/KAFKA-14947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17717095#comment-17717095 ] 

Yash Mayya commented on KAFKA-14947:
------------------------------------

[~krishnendudas] this seems like a bug in the connector's implementation rather than a bug in the Connect framework in Apache Kafka. Since source connector offsets are committed by Connect workers periodically and asynchronously, there is no guarantee provided that offsets will be committed between successive poll calls. `OffsetStorageReader::offset` is typically used only during startup of connectors / tasks to resume progress after restarts, pause / resume etc.

 

In your provided scenario, why can't the connector simply read from its previous position in the second poll since it should be maintaining some internal state? Also note that Kafka Connect doesn't support exactly-once semantics for source connectors in 3.1.1, this functionality was added in 3.3.0. Depending on your specific connector, it might also need additional changes to support the changes made in [KIP-618|https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors] (this is publicly documented [here|https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors]). Another more lightweight option might be for the connector to implement the [SourceTask::commit|https://javadoc.io/static/org.apache.kafka/connect-api/3.4.0/org/apache/kafka/connect/source/SourceTask.html#commit--] method and not advance its internal position / offset state until previous offsets are committed - however, it's important to note that this won't guarantee exactly-once semantics.

> Duplicate records are getting created in the topic. 
> ----------------------------------------------------
>
>                 Key: KAFKA-14947
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14947
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 3.1.1
>            Reporter: krishnendu Das
>            Priority: Blocker
>         Attachments: Kafka_server_3.1.1_data_duplication_issue_log
>
>
> We are using Kafka connect API (version 2.3.0) and Kafka  (version 3.1.1) for data ingestion purposes. Previously we were using Kafka (version 2.6.2) and the same Kafka connect API (version 2.3.0). The data ingestion was happening properly. 
>  
> Recently we updated the Kafka version from 2.6.2 to 3.1.1.
> Post update we are facing duplicate data issues from the source connector into the Kafka topic. After debugging the 3.1.1 code, we saw one new function
> {*}updateCommittableOffsets{*}() got added and called inside the {*}WorkerSourceTask::execute{*}() as part of bug fix --"KAFKA-12226: Commit source task offsets without blocking on batch delivery (#11323)"
>  
> Now because of this function, we are observing this scenario
>  # Inside the execute() at the start of the flow, the call goes to updateCommittableOffsets() to check if anything was there to perform the committed offset or not. As the first poll is still not yet happened, this function didn't find anything for commit.
>  # Then Kafka connects API poll() method is called from the WorkerSourceTask::execute(). *---------> 1st poll*
>  # Kafka Connect API (using sleepy policy) reads one source file from the Cloud source directory.
>  # Read the whole content of the file and send the result set Kafka server to write to the Kafka topic.
>  # During the 2nd poll updateCommittableOffsets() found some offset to commit and its updates a reference variable committableOffsets, which will be used further by the WorkerSourceTask::commitOffsets() function to perform actual commit offset.
>  # Then Kafka connects the API poll() method is called from the *WorkerSourceTask::execute().* *---------> 2nd poll*
>  # Kafka Connect API (using sleepy policy) reads the same source file again from the start, as the offsetStrorageReader::offset() didn’t give the latest offset.
>  # Read the whole content of the file and send the result set Kafka server to write to the Kafka topic.---> These create duplicate data into the topic.
> ................................................
> ................................................
>  # WorkerSourceTask::commitOffsets() commits the offset.
> ................................................
> ................................................
>  # Then Kafka connects API poll() method is called from the {*}WorkerSourceTask::execute(){*}. ---------> 3rd poll
>  # This time offsetStrorageReader::offset() will be able to give the latest offset.
>  # Kafka Connect API (using sleepy policy) reads the same source file from the last read position.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)