You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Ning Zhang <ni...@gmail.com> on 2021/04/06 06:12:14 UTC

Re: MirrorMaker 2 and Negative Lag

Hello Frank,

Happy to look into it. Do you mind to open a jira ticket and put your and Alan's observations into it, then assign it to me (also would be great to refer this link)?

I will take some time to reproduce it when available.

Thanks  

On 2021/03/30 19:08:56, Frank Yi <fy...@coursera.org> wrote: 
> Hey Ning,
> 
> I believe "if the CG offsets do not contain a pair of <topic, partition>,
> simply sync the offsets from source" could be the problematic behavior
> here? I'm not very familiar with Mirrormaker's internals, so can't say for
> sure.
> 
> As I described previously, this "negative lag" problem happens when a
> target partition is empty (log end offset = 0) and the source CG offset is
> > 0. This scenario can be reached if a consumer processed some records
> which were then deleted (eg. by retention policy), or, like Alan
> encountered, when MM replication is set to start at "latest".  When this is
> the case, Mirrormaker sets the target CG offset equal to the literal, not
> converted, source CG offset. The correct behavior here should be to set the
> target CG offset to 0 when the target partition is empty, not to the source
> CG offset.
> 
> The negative lag can cause CGs to miss messages during a migration if new
> messages are sent between stopping a CG on the source cluster and starting
> the CG on the target cluster.
> 
> 
> On Fri, Mar 26, 2021 at 1:52 AM Ning Zhang <ni...@gmail.com> wrote:
> 
> > Hello Frank,
> >
> > Thanks for helping on analyzing the issue.
> >
> > Regarding when the CG offsets at destination cluster will be updated. From
> > the current codebase, there seems 2 criteria:
> >
> > (1) if the CG offsets do not contain a pair of <topic, partition>, simply
> > sync the offsets from source
> > (2) for a pair of <topic, partition>, if the converted CG offset is
> > greater than the real CG offset at destination, sync the offset from source
> >
> >
> > https://github.com/apache/kafka/blob/c19a35d1b740c85559a7ff4e882fc95b4737808d/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L270-L279
> >
> > If you have any suggested change, please share with me. Thanks
> >
> > On 2021/03/23 18:42:19, Frank Yi <fy...@coursera.org> wrote:
> > > Hey Alan,
> > >
> > > I'm running into the same issue as you and I believe I've figured it out.
> > >
> > > I noticed that consumer partitions on the destination cluster that have
> > > a LOG-END-OFFSET=0 all exhibit this issue. It looks like
> > > Mirrormaker's offset sync does not work correctly if the partition is
> > > empty. Instead of setting the destination's offset to 0 for these empty
> > > partitions, it sets destination's offset to the same numerical value as
> > the
> > > source's offset.
> > >
> > > If you now send a message to these partitions, making them non-empty, the
> > > offset will still not update until the "correct" offset is greater than
> > the
> > > offset on the partition. The only workaround I've found is to make these
> > > partitions non-empty, then delete the consumer group on the destination
> > > cluster and let Mirrormaker resync it.
> > >
> > > Hope this helps,
> > > Frank
> > >
> > > On 2021/03/15 21:59:03, Alan Ning <a....@gmail.com> wrote:
> > > > I am running MirrorMaker 2 (Kafka 2.7), trying to migrate all topics
> > > from>
> > > > one cluster to another while preserving through>
> > > > `sync.group.offsets.enabled=true`. My source cluster is running Kafka
> > > 0.10,>
> > > > while the target cluster is running 2.6.1.>
> > > >
> > > > While I can see data being replicated, the data on the replicated
> > > Consumer>
> > > > Group in the target cluster looks wrong. The lag values of the
> > > replicated>
> > > > Consumer Group are large negative values, and the LOG-END-OFFSET are
> > > mostly>
> > > > 0. I determined this information from kafka-consumer-groups.sh.>
> > > >
> > > > I checked the kafka_consumer_consumer_fetch_manager_metrics_records_lag
> > > JMX>
> > > > metrics in MM2 and the reported lag is zero for all partitions.>
> > > >
> > > > By using `sync.group.offsets.enabled=true`, I envisioned that MM2 will>
> > > > automatically replicate and sync all Consumer Groups with a meaningful>
> > > > offset in the target cluster. Am I misunderstanding how MM2 is supposed
> > > to>
> > > > work?>
> > > >
> > > > Here is my mm2.properties and the CG details.>
> > > >
> > > > # mm2.properties>
> > > > ```>
> > > > clusters = src, dst>
> > > > src.bootstrap.servers = 10.0.0.1:9092>
> > > > dst.bootstrap.servers = 10.0.0.2:9092>
> > > > src->dst.enabled = true>
> > > > src->dst.topics = compute.*>
> > > > src->dst.offset.flush.timeout.ms=60000>
> > > > src->dst.buffer.memory=10000>
> > > > dst->src.enabled = true>
> > > > dst->src.topics = .*>
> > > > replication.factor=3>
> > > > src->dst.sync.group.offsets.enabled = true>
> > > > src->dst.emit.checkpoints.enabled = true>
> > > > src->dst.consumer.auto.offset.reset=latest>
> > > > consumer.auto.offset.reset = latest>
> > > > auto.offset.reset = latest>
> > > > replication.policy.class =>
> > > > com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy>
> > > > checkpoints.topic.replication.factor=3>
> > > > heartbeats.topic.replication.factor=3>
> > > > offset-syncs.topic.replication.factor=3>
> > > > offset.storage.replication.factor=3>
> > > > status.storage.replication.factor=3>
> > > > config.storage.replication.factor=3>
> > > > sync.topic.acls.enabled = false>
> > > > sync.group.offsets.enabled = true>
> > > > emit.checkpoints.enabled = true>
> > > > tasks.max = 8>
> > > > dst.producer.offset.flush.timeout.ms = 60000>
> > > > dst.offset.flush.timeout.ms = 60000>
> > > > ```>
> > > >
> > > > Consumer Group details>
> > > > ```>
> > > > GROUP                                         TOPIC>
> > > >             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG>
> > > > CONSUMER-ID     HOST            CLIENT-ID>
> > > > kafka-group-Compute-Requests Compute-Requests 57         5305947
> > > 0>
> > > >               -5305947        -               -               ->
> > > > kafka-group-Compute-Requests Compute-Requests 20         5164205
> > > 0>
> > > >               -5164205        -               -               ->
> > > > kafka-group-Compute-Requests Compute-Requests 53         4208527
> > > 0>
> > > >               -4208527        -               -               ->
> > > > kafka-group-Compute-Requests Compute-Requests 82         5247928
> > > 0>
> > > >               -5247928        -               -               ->
> > > > kafka-group-Compute-Requests Compute-Requests 65         5574520
> > > 0>
> > > >               -5574520        -               -               ->
> > > > kafka-group-Compute-Requests Compute-Requests 11         5190708>
> > > > 209             -5190499        -               -               ->
> > > > ```>
> > > >
> > > > Thanks>
> > > >
> > > > ... Alan>
> > > >
> > >
> >
> 

Re: MirrorMaker 2 and Negative Lag

Posted by Frank Yi <fy...@coursera.org>.
Thanks Ning,

I created https://issues.apache.org/jira/browse/KAFKA-12635 . Looks like I
don't have permissions to assign it to you though.



On Tue, Apr 6, 2021 at 2:13 AM Ning Zhang <ni...@gmail.com> wrote:

> Hello Frank,
>
> Happy to look into it. Do you mind to open a jira ticket and put your and
> Alan's observations into it, then assign it to me (also would be great to
> refer this link)?
>
> I will take some time to reproduce it when available.
>
> Thanks
>
> On 2021/03/30 19:08:56, Frank Yi <fy...@coursera.org> wrote:
> > Hey Ning,
> >
> > I believe "if the CG offsets do not contain a pair of <topic, partition>,
> > simply sync the offsets from source" could be the problematic behavior
> > here? I'm not very familiar with Mirrormaker's internals, so can't say
> for
> > sure.
> >
> > As I described previously, this "negative lag" problem happens when a
> > target partition is empty (log end offset = 0) and the source CG offset
> is
> > > 0. This scenario can be reached if a consumer processed some records
> > which were then deleted (eg. by retention policy), or, like Alan
> > encountered, when MM replication is set to start at "latest".  When this
> is
> > the case, Mirrormaker sets the target CG offset equal to the literal, not
> > converted, source CG offset. The correct behavior here should be to set
> the
> > target CG offset to 0 when the target partition is empty, not to the
> source
> > CG offset.
> >
> > The negative lag can cause CGs to miss messages during a migration if new
> > messages are sent between stopping a CG on the source cluster and
> starting
> > the CG on the target cluster.
> >
> >
> > On Fri, Mar 26, 2021 at 1:52 AM Ning Zhang <ni...@gmail.com>
> wrote:
> >
> > > Hello Frank,
> > >
> > > Thanks for helping on analyzing the issue.
> > >
> > > Regarding when the CG offsets at destination cluster will be updated.
> From
> > > the current codebase, there seems 2 criteria:
> > >
> > > (1) if the CG offsets do not contain a pair of <topic, partition>,
> simply
> > > sync the offsets from source
> > > (2) for a pair of <topic, partition>, if the converted CG offset is
> > > greater than the real CG offset at destination, sync the offset from
> source
> > >
> > >
> > >
> https://github.com/apache/kafka/blob/c19a35d1b740c85559a7ff4e882fc95b4737808d/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L270-L279
> > >
> > > If you have any suggested change, please share with me. Thanks
> > >
> > > On 2021/03/23 18:42:19, Frank Yi <fy...@coursera.org> wrote:
> > > > Hey Alan,
> > > >
> > > > I'm running into the same issue as you and I believe I've figured it
> out.
> > > >
> > > > I noticed that consumer partitions on the destination cluster that
> have
> > > > a LOG-END-OFFSET=0 all exhibit this issue. It looks like
> > > > Mirrormaker's offset sync does not work correctly if the partition is
> > > > empty. Instead of setting the destination's offset to 0 for these
> empty
> > > > partitions, it sets destination's offset to the same numerical value
> as
> > > the
> > > > source's offset.
> > > >
> > > > If you now send a message to these partitions, making them
> non-empty, the
> > > > offset will still not update until the "correct" offset is greater
> than
> > > the
> > > > offset on the partition. The only workaround I've found is to make
> these
> > > > partitions non-empty, then delete the consumer group on the
> destination
> > > > cluster and let Mirrormaker resync it.
> > > >
> > > > Hope this helps,
> > > > Frank
> > > >
> > > > On 2021/03/15 21:59:03, Alan Ning <a....@gmail.com> wrote:
> > > > > I am running MirrorMaker 2 (Kafka 2.7), trying to migrate all
> topics
> > > > from>
> > > > > one cluster to another while preserving through>
> > > > > `sync.group.offsets.enabled=true`. My source cluster is running
> Kafka
> > > > 0.10,>
> > > > > while the target cluster is running 2.6.1.>
> > > > >
> > > > > While I can see data being replicated, the data on the replicated
> > > > Consumer>
> > > > > Group in the target cluster looks wrong. The lag values of the
> > > > replicated>
> > > > > Consumer Group are large negative values, and the LOG-END-OFFSET
> are
> > > > mostly>
> > > > > 0. I determined this information from kafka-consumer-groups.sh.>
> > > > >
> > > > > I checked the
> kafka_consumer_consumer_fetch_manager_metrics_records_lag
> > > > JMX>
> > > > > metrics in MM2 and the reported lag is zero for all partitions.>
> > > > >
> > > > > By using `sync.group.offsets.enabled=true`, I envisioned that MM2
> will>
> > > > > automatically replicate and sync all Consumer Groups with a
> meaningful>
> > > > > offset in the target cluster. Am I misunderstanding how MM2 is
> supposed
> > > > to>
> > > > > work?>
> > > > >
> > > > > Here is my mm2.properties and the CG details.>
> > > > >
> > > > > # mm2.properties>
> > > > > ```>
> > > > > clusters = src, dst>
> > > > > src.bootstrap.servers = 10.0.0.1:9092>
> > > > > dst.bootstrap.servers = 10.0.0.2:9092>
> > > > > src->dst.enabled = true>
> > > > > src->dst.topics = compute.*>
> > > > > src->dst.offset.flush.timeout.ms=60000>
> > > > > src->dst.buffer.memory=10000>
> > > > > dst->src.enabled = true>
> > > > > dst->src.topics = .*>
> > > > > replication.factor=3>
> > > > > src->dst.sync.group.offsets.enabled = true>
> > > > > src->dst.emit.checkpoints.enabled = true>
> > > > > src->dst.consumer.auto.offset.reset=latest>
> > > > > consumer.auto.offset.reset = latest>
> > > > > auto.offset.reset = latest>
> > > > > replication.policy.class =>
> > > > > com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy>
> > > > > checkpoints.topic.replication.factor=3>
> > > > > heartbeats.topic.replication.factor=3>
> > > > > offset-syncs.topic.replication.factor=3>
> > > > > offset.storage.replication.factor=3>
> > > > > status.storage.replication.factor=3>
> > > > > config.storage.replication.factor=3>
> > > > > sync.topic.acls.enabled = false>
> > > > > sync.group.offsets.enabled = true>
> > > > > emit.checkpoints.enabled = true>
> > > > > tasks.max = 8>
> > > > > dst.producer.offset.flush.timeout.ms = 60000>
> > > > > dst.offset.flush.timeout.ms = 60000>
> > > > > ```>
> > > > >
> > > > > Consumer Group details>
> > > > > ```>
> > > > > GROUP                                         TOPIC>
> > > > >             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG>
> > > > > CONSUMER-ID     HOST            CLIENT-ID>
> > > > > kafka-group-Compute-Requests Compute-Requests 57         5305947
> > > > 0>
> > > > >               -5305947        -               -               ->
> > > > > kafka-group-Compute-Requests Compute-Requests 20         5164205
> > > > 0>
> > > > >               -5164205        -               -               ->
> > > > > kafka-group-Compute-Requests Compute-Requests 53         4208527
> > > > 0>
> > > > >               -4208527        -               -               ->
> > > > > kafka-group-Compute-Requests Compute-Requests 82         5247928
> > > > 0>
> > > > >               -5247928        -               -               ->
> > > > > kafka-group-Compute-Requests Compute-Requests 65         5574520
> > > > 0>
> > > > >               -5574520        -               -               ->
> > > > > kafka-group-Compute-Requests Compute-Requests 11         5190708>
> > > > > 209             -5190499        -               -               ->
> > > > > ```>
> > > > >
> > > > > Thanks>
> > > > >
> > > > > ... Alan>
> > > > >
> > > >
> > >
> >
>