You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Anil Dasari (Jira)" <ji...@apache.org> on 2022/01/19 18:58:00 UTC

[jira] [Created] (KAFKA-13601) Add option to support sync offset commit in Kafka Connect Sink

Anil Dasari created KAFKA-13601:
-----------------------------------

             Summary: Add option to support sync offset commit in Kafka Connect Sink
                 Key: KAFKA-13601
                 URL: https://issues.apache.org/jira/browse/KAFKA-13601
             Project: Kafka
          Issue Type: New Feature
          Components: KafkaConnect
            Reporter: Anil Dasari


Exactly once in s3 connector with scheduled rotation and field partitioner can be achieved with consumer offset sync' commit. 

Currently, WorkerSinkTask committing the consumer offsets asynchronously. 
private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean closing, final int seqno) \{
        log.info("{} Committing offsets", this);
        if (closing) \{
            doCommitSync(offsets, seqno);
        } else \{
            OffsetCommitCallback cb = new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception error) {
                    lastCommittedOffsets = offsets;
                    onCommitCompleted(error, seqno);
                }
            };
            consumer.commitAsync(offsets, cb);
        }
    }
 

Add config to sink to chose sync' offset commit



--
This message was sent by Atlassian Jira
(v8.20.1#820001)