You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Yu-Jhe Li (Jira)" <ji...@apache.org> on 2021/10/26 10:00:03 UTC

[jira] [Updated] (KAFKA-13404) Kafka sink connectors do not commit offset correctly if messages are produced in transaction

     [ https://issues.apache.org/jira/browse/KAFKA-13404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yu-Jhe Li updated KAFKA-13404:
------------------------------
    Description: 
The Kafka sink connectors don't commit offset to the latest log-end offset if the messages are produced in a transaction.

From the code of [WorkerSinkTask.java|#L467], we found that the sink connector gets offset from messages and commits it to Kafka after the messages are processed successfully. But for messages produced in the transaction, there are additional record [control batches|http://kafka.apache.org/documentation/#controlbatch] that are used to indicate the transaction is successful or aborted.

 

You can reproduce it by running `connect-file-sink` with the following properties:
{noformat}
/opt/kafka/bin/connect-standalone.sh /connect-standalone.properties /connect-file-sink.properties{noformat}
 
{code:java}
# connect-standalone.properties
bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true

# for testing
offset.flush.interval.ms=10000

consumer.isolation.level=read_committed
consumer.auto.offset.reset=latest
{code}
 
{code:java}
# connect-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/tmp/test.sink.txt
topics=test{code}
And use Java producer to produce 10 messages to 

 

 

 

  was:
The Kafka sink connectors don't commit offset to the latest log-end offset if the messages are produced in a transaction.

From the code of [WorkerSinkTask.java|#L467]], we found that the sink connector gets offset from messages and commits it to Kafka after the messages are processed successfully.

 


> Kafka sink connectors do not commit offset correctly if messages are produced in transaction
> --------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13404
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13404
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 2.6.1
>            Reporter: Yu-Jhe Li
>            Priority: Major
>
> The Kafka sink connectors don't commit offset to the latest log-end offset if the messages are produced in a transaction.
> From the code of [WorkerSinkTask.java|#L467], we found that the sink connector gets offset from messages and commits it to Kafka after the messages are processed successfully. But for messages produced in the transaction, there are additional record [control batches|http://kafka.apache.org/documentation/#controlbatch] that are used to indicate the transaction is successful or aborted.
>  
> You can reproduce it by running `connect-file-sink` with the following properties:
> {noformat}
> /opt/kafka/bin/connect-standalone.sh /connect-standalone.properties /connect-file-sink.properties{noformat}
>  
> {code:java}
> # connect-standalone.properties
> bootstrap.servers=localhost:9092
> key.converter=org.apache.kafka.connect.storage.StringConverter
> value.converter=org.apache.kafka.connect.storage.StringConverter
> key.converter.schemas.enable=true
> value.converter.schemas.enable=true
> # for testing
> offset.flush.interval.ms=10000
> consumer.isolation.level=read_committed
> consumer.auto.offset.reset=latest
> {code}
>  
> {code:java}
> # connect-file-sink.properties
> name=local-file-sink
> connector.class=FileStreamSink
> tasks.max=1
> file=/tmp/test.sink.txt
> topics=test{code}
> And use Java producer to produce 10 messages to 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)