You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Alexis Josephides (Jira)" <ji...@apache.org> on 2021/08/04 09:29:00 UTC

[jira] [Commented] (KAFKA-12468) Initial offsets are copied from source to target cluster

    [ https://issues.apache.org/jira/browse/KAFKA-12468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392914#comment-17392914 ] 

Alexis Josephides commented on KAFKA-12468:
-------------------------------------------

Thanks for the suggestions and apologies for the delay in updating how we handled this issue in the end.
Should say from the outset that we did not completely remove this issue but we minimised the occurrences, fixed some and in the remainder - lived with it.
The first step was minimisation. We achieved this via the phasing of turning on our connectors. The first connector we applied was the `Source` connector. For our setup we had a number of source connectors - some set to replicate from `latest` and others from `earliest`. We let this connector run and replicate until we hit a steady state and all replication was confirmed to be at the head of their relevant topic. This soak could be a few days depending on your data volumes, throughputs (client limits) etc.....
Once the soak has completed we then turned on the Checkpoint connector.

If there are negative offsets after this first step we then took steps to manage them. There are 2 categories here. Partitions that have data on them and partitions that have no data on them.
In the first instance (data on partitions) the first thing we try is to `delete` the affected consumer group. This is absolutely fine to do as a) no consumers on the target cluster yet, b) the group is replicated again by MM2.
In 90% of instances the negative offset was corrected.

In the second instance (no data on partitions) the first thing we examined is whether we could publish data (on source cluster) onto the topic to put data onto the partition. This was then followed by a refresh (delete) of the affected consumer group. This was possible only if the downstream consumer handled either dummy garbage messages ok or was fine with a small number of duplicate messages.

What if following the above a negative offset remained?
In the instance where there was zero data on a partition and no new data could be published to it we let the consumer migrate onto the target cluster without much worry. The Kafka consumer behaviour at this point would look at a negative offset and throw a warning that it was out of range. It would then reset it's offset on the cluster to its default setting - either consumer from `latest` or `earliest`. Since there is 0 data on that partition this is one and the same thing.

For instances (rare but did occur) where there remained a negative offset and data on the partition we still migrated and relied on the consumer behaviour to reset its offset to either `earliest` or `latest`. Depending on the consumer and it's use case we picked whichever best suited the scenario.

Hope this is helpful in some way to others that might be experiencing these issues.

> Initial offsets are copied from source to target cluster
> --------------------------------------------------------
>
>                 Key: KAFKA-12468
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12468
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 2.7.0
>            Reporter: Bart De Neuter
>            Priority: Major
>
> We have an active-passive setup where  the 3 connectors from mirror maker 2 (heartbeat, checkpoint and source) are running on a dedicated Kafka connect cluster on the target cluster.
> Offset syncing is enabled as specified by KIP-545. But when activated, it seems the offsets from the source cluster are initially copied to the target cluster without translation. This causes a negative lag for all synced consumer groups. Only when we reset the offsets for each topic/partition on the target cluster and produce a record on the topic/partition in the source, the sync starts working correctly. 
> I would expect that the consumer groups are synced but that the current offsets of the source cluster are not copied to the target cluster.
> This is the configuration we are currently using:
> Heartbeat connector
>  
> {code:xml}
> {
>   "name": "mm2-mirror-heartbeat",
>   "config": {
>     "name": "mm2-mirror-heartbeat",
>     "connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
>     "source.cluster.alias": "eventador",
>     "target.cluster.alias": "msk",
>     "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
>     "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
>     "topics": ".*",
>     "groups": ".*",
>     "tasks.max": "1",
>     "replication.policy.class": "CustomReplicationPolicy",
>     "sync.group.offsets.enabled": "true",
>     "sync.group.offsets.interval.seconds": "5",
>     "emit.checkpoints.enabled": "true",
>     "emit.checkpoints.interval.seconds": "30",
>     "emit.heartbeats.interval.seconds": "30",
>     "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
>     "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
> Checkpoint connector:
> {code:xml}
> {
>   "name": "mm2-mirror-checkpoint",
>   "config": {
>     "name": "mm2-mirror-checkpoint",
>     "connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
>     "source.cluster.alias": "eventador",
>     "target.cluster.alias": "msk",
>     "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
>     "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
>     "topics": ".*",
>     "groups": ".*",
>     "tasks.max": "40",
>     "replication.policy.class": "CustomReplicationPolicy",
>     "sync.group.offsets.enabled": "true",
>     "sync.group.offsets.interval.seconds": "5",
>     "emit.checkpoints.enabled": "true",
>     "emit.checkpoints.interval.seconds": "30",
>     "emit.heartbeats.interval.seconds": "30",
>     "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
>     "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
>  Source connector:
> {code:xml}
> {
>   "name": "mm2-mirror-source",
>   "config": {
>     "name": "mm2-mirror-source",
>     "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
>     "source.cluster.alias": "eventador",
>     "target.cluster.alias": "msk",
>     "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
>     "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
>     "topics": ".*",
>     "groups": ".*",
>     "tasks.max": "40",
>     "replication.policy.class": "CustomReplicationPolicy",
>     "sync.group.offsets.enabled": "true",
>     "sync.group.offsets.interval.seconds": "5",
>     "emit.checkpoints.enabled": "true",
>     "emit.checkpoints.interval.seconds": "30",
>     "emit.heartbeats.interval.seconds": "30",
>     "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
>     "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)