You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Ning Zhang (Jira)" <ji...@apache.org> on 2020/08/05 16:45:00 UTC

[jira] [Comment Edited] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics

    [ https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171612#comment-17171612 ] 

Ning Zhang edited comment on KAFKA-10339 at 8/5/20, 4:44 PM:
-------------------------------------------------------------

thanks for the input. I think that sounds a working plan. Here is my follow-up thoughts

_"when MirrorSourceTask starts, it loads initial offsets from __consumer_offsets on the target cluster."_

As the consumer is configured to pull data from source cluster, I am thinking we probably need to:
(1) add a new API (called "Map<TopicPartition, Long> loadOffsets()") to [SinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java] 
(2) in MirrorSinkTask.java, implement/override loadOffsets() to supply the consumer offsets loaded from target cluster.
(3) in [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295], when initialize the consumer, if `task.loadOffsets()` returns non empty, use the returned offsets as the starting point.

_"in addition, we call addOffsetsToTransaction in order to commit offsets to the "fake" consumer group on the target cluster."_

I fully agree with the "fake" consumer group on the target cluster. I am thinking if "addOffsetsToTransaction" has been taken care by *producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);*?


was (Author: yangguo1220):
thanks for the input. I think that sounds a working plan. Here is my follow-up thoughts

_"when MirrorSourceTask starts, it loads initial offsets from __consumer_offsets on the target cluster."_

As the consumer is configured to pull data from source cluster, I am thinking we probably need to:
(1) add a new API (called "Map<TopicPartition, Long> loadOffsets()") to [SinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java] 
(2) in MirrorSinkTask.java, implement/override loadOffsets() to supply the consumer offsets loaded from target cluster.
(3) in [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295], when initialize the consumer, if `task.loadOffsets()` returns non empty, use the returned offsets as the starting point.

_"in addition, we call addOffsetsToTransaction in order to commit offsets to the "fake" consumer group on the target cluster."_
I fully agree with the "fake" consumer group on the target cluster. I am thinking if "addOffsetsToTransaction" has been taken care by *producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);*?

> MirrorMaker2 Exactly-once Semantics
> -----------------------------------
>
>                 Key: KAFKA-10339
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10339
>             Project: Kafka
>          Issue Type: New Feature
>          Components: KafkaConnect
>            Reporter: Ning Zhang
>            Assignee: Ning Zhang
>            Priority: Major
>              Labels: needs-kip
>
> MirrorMaker2 is currently implemented on Kafka Connect Framework, more specifically the Source Connector / Task, which do not provide exactly-once semantics (EOS) out-of-the-box, as discussed in https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  https://github.com/apache/kafka/pull/5553, https://issues.apache.org/jira/browse/KAFKA-6080  and https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 currently does not provide EOS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)