You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Ning Zhang (Jira)" <ji...@apache.org> on 2020/08/05 16:45:00 UTC
[jira] [Comment Edited] (KAFKA-10339) MirrorMaker2 Exactly-once
Semantics
[ https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171612#comment-17171612 ]
Ning Zhang edited comment on KAFKA-10339 at 8/5/20, 4:44 PM:
-------------------------------------------------------------
thanks for the input. I think that sounds a working plan. Here is my follow-up thoughts
_"when MirrorSourceTask starts, it loads initial offsets from __consumer_offsets on the target cluster."_
As the consumer is configured to pull data from source cluster, I am thinking we probably need to:
(1) add a new API (called "Map<TopicPartition, Long> loadOffsets()") to [SinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java]
(2) in MirrorSinkTask.java, implement/override loadOffsets() to supply the consumer offsets loaded from target cluster.
(3) in [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295], when initialize the consumer, if `task.loadOffsets()` returns non empty, use the returned offsets as the starting point.
_"in addition, we call addOffsetsToTransaction in order to commit offsets to the "fake" consumer group on the target cluster."_
I fully agree with the "fake" consumer group on the target cluster. I am thinking if "addOffsetsToTransaction" has been taken care by *producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);*?
was (Author: yangguo1220):
thanks for the input. I think that sounds a working plan. Here is my follow-up thoughts
_"when MirrorSourceTask starts, it loads initial offsets from __consumer_offsets on the target cluster."_
As the consumer is configured to pull data from source cluster, I am thinking we probably need to:
(1) add a new API (called "Map<TopicPartition, Long> loadOffsets()") to [SinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java]
(2) in MirrorSinkTask.java, implement/override loadOffsets() to supply the consumer offsets loaded from target cluster.
(3) in [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295], when initialize the consumer, if `task.loadOffsets()` returns non empty, use the returned offsets as the starting point.
_"in addition, we call addOffsetsToTransaction in order to commit offsets to the "fake" consumer group on the target cluster."_
I fully agree with the "fake" consumer group on the target cluster. I am thinking if "addOffsetsToTransaction" has been taken care by *producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);*?
> MirrorMaker2 Exactly-once Semantics
> -----------------------------------
>
> Key: KAFKA-10339
> URL: https://issues.apache.org/jira/browse/KAFKA-10339
> Project: Kafka
> Issue Type: New Feature
> Components: KafkaConnect
> Reporter: Ning Zhang
> Assignee: Ning Zhang
> Priority: Major
> Labels: needs-kip
>
> MirrorMaker2 is currently implemented on Kafka Connect Framework, more specifically the Source Connector / Task, which do not provide exactly-once semantics (EOS) out-of-the-box, as discussed in https://github.com/confluentinc/kafka-connect-jdbc/issues/461, https://github.com/apache/kafka/pull/5553, https://issues.apache.org/jira/browse/KAFKA-6080 and https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 currently does not provide EOS.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)