You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Bart De Neuter (Jira)" <ji...@apache.org> on 2021/03/15 20:49:00 UTC

[jira] [Created] (KAFKA-12468) Initial offsets are copied from source to target cluster

Bart De Neuter created KAFKA-12468:
--------------------------------------

             Summary: Initial offsets are copied from source to target cluster
                 Key: KAFKA-12468
                 URL: https://issues.apache.org/jira/browse/KAFKA-12468
             Project: Kafka
          Issue Type: Bug
          Components: mirrormaker
    Affects Versions: 2.7.0
            Reporter: Bart De Neuter


We have an active-passive setup where  the 3 connectors from mirror maker 2 (heartbeat, checkpoint and source) are running on a dedicated Kafka connect cluster on the target cluster.

Offset syncing is enabled as specified by KIP-545. But when activated, it seems the offsets from the source cluster are initially copied to the target cluster without translation. This causes a negative lag for all synced consumer groups. Only when we reset the offsets for each topic/partition on the target cluster and produce a record on the topic/partition in the source, the sync starts working correctly. 

I would expect that the consumer groups are synced but that the current offsets of the source cluster are not copied to the target cluster.

This is the configuration we are currently using:

Heartbeat connector

 
{code:xml}
{
  "name": "mm2-mirror-heartbeat",
  "config": {
    "name": "mm2-mirror-heartbeat",
    "connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "source.cluster.alias": "eventador",
    "target.cluster.alias": "msk",
    "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
    "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
    "topics": ".*",
    "groups": ".*",
    "tasks.max": "1",
    "replication.policy.class": "CustomReplicationPolicy",
    "sync.group.offsets.enabled": "true",
    "sync.group.offsets.interval.seconds": "5",
    "emit.checkpoints.enabled": "true",
    "emit.checkpoints.interval.seconds": "30",
    "emit.heartbeats.interval.seconds": "30",
    "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}
{code}
Checkpoint connector:
{code:xml}
{
  "name": "mm2-mirror-checkpoint",
  "config": {
    "name": "mm2-mirror-checkpoint",
    "connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "source.cluster.alias": "eventador",
    "target.cluster.alias": "msk",
    "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
    "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
    "topics": ".*",
    "groups": ".*",
    "tasks.max": "40",
    "replication.policy.class": "CustomReplicationPolicy",
    "sync.group.offsets.enabled": "true",
    "sync.group.offsets.interval.seconds": "5",
    "emit.checkpoints.enabled": "true",
    "emit.checkpoints.interval.seconds": "30",
    "emit.heartbeats.interval.seconds": "30",
    "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}
{code}
 Source connector:
{code:xml}
{
  "name": "mm2-mirror-source",
  "config": {
    "name": "mm2-mirror-source",
    "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "source.cluster.alias": "eventador",
    "target.cluster.alias": "msk",
    "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
    "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
    "topics": ".*",
    "groups": ".*",
    "tasks.max": "40",
    "replication.policy.class": "CustomReplicationPolicy",
    "sync.group.offsets.enabled": "true",
    "sync.group.offsets.interval.seconds": "5",
    "emit.checkpoints.enabled": "true",
    "emit.checkpoints.interval.seconds": "30",
    "emit.heartbeats.interval.seconds": "30",
    "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)