You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Anatoliy Soldatov <ak...@avito.ru.INVALID> on 2019/03/05 11:05:57 UTC

Odd behaviour of MirrorMaker and ConsumerGroupCommand

Hello, guys!

I am not sure about offsets replicated by MirrorMaker.

I am replicating data from one Kafka cluster (let's say cluster A, Confluent Kafka 2.0) to another (cluster B, Confluent Kafka 2.1) with internal topics.
MirrorMaker lag is somewhere between 1-2k events.

I started replication after some time (some old events were removed because of retention).
Topics on both clusters have similar number of partitions.
ConsumerGroupCommand on cluster A and cluster B showing different results as below.

cluster A (source):
~> kafka-consumer-groups \
  --bootstrap-server clusterA:9092 \
  --describe \
  --group "some_group" | sort

------------------------------
TOPIC                  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG   CONSUMER-ID                                 HOST           CLIENT-ID
some_topic             0          560498          560498          0     sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
some_topic             1          560569          560571          2     sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
some_topic             2          560478          560480          2     sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
some_topic             3          560528          560530          2     sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
some_topic             4          560542          560543          1     sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
some_topic             5          560497          560498          1     sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
some_topic             6          560484          560484          0     sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
some_topic             7          560527          560527          0     sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
some_topic             8          560539          560540          1     sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama

cluster B (destination):
~> kafka-consumer-groups \
  --bootstrap-server clusterB:9092 \
  --describe \
  --group "some_group" | sort

------------------------------
Consumer group 'some_group' has no active members.

TOPIC                  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG      CONSUMER-ID     HOST            CLIENT-ID
some_topic             0          373323          481950          108627   -               -               -


However, offset metadata in __consumer_offsets topic is the same on both clusters.

cluster A (source):
~> kafka-console-consumer \
--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
--bootstrap-server clusterA:9092 \
--topic __consumer_offsets | grep some_topic

------------------------------
[some_group,some_topic,7]::[OffsetMetadata[561492,NO_METADATA],CommitTime 1551782709369,ExpirationTime 1552387509369]
...

cluster B (destination):
~> kafka-console-consumer \
--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
--bootstrap-server clusterB:9092 \
--topic __consumer_offsets | grep some_topic

------------------------------
[some_group,some_topic,7]::OffsetAndMetadata(offset=561492, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1551782709369, expireTimestamp=Some(1552387509369))
...

Notice, that offsets matches output of ConsumerGroupCommand for cluster A, but not for cluster B.

So, there are my questions:
Why ConsumerGroupCommand showing different results on cluster A and cluster B?
Why consumer lag is so high on cluster B?
Is ConsumerGroupCommand using info not from __consumer_offsets topic?
Will all my consumers stuck (because of incompatible offsets) in case of failover?

Kind regards,
Tolya


________________________________
"This message contains confidential information/commercial secret. If you are not the intended addressee of this message you may not copy, save, print or forward it to any third party and you are kindly requested to destroy this message and notify the sender thereof by email.
?????? ????????? ???????? ???????????????? ??????????/??????????, ?????????? ???????????? ??????. ???? ?? ?? ????????? ?????????? ????????? ??????? ?????????, ?? ?? ?????? ??????????, ?????????, ???????? ??? ?????????? ??? ????? ???? ???? ?????. ??????? ?????????? ?????? ????????? ? ????????? ?? ???? ??????????? ??????????? ???????."

Re: Odd behaviour of MirrorMaker and ConsumerGroupCommand

Posted by Anatoliy Soldatov <ak...@avito.ru.INVALID>.
Ryanne, thank you!

Now it is clear, why offsets on rc behave like this.

Tolya

5 марта 2019 г., в 20:05, Ryanne Dolan <ry...@gmail.com>> написал(а):

Tolya,

You mentioned that you are replicating "with internal topics", so I'd
expect the __consumer_offsets topic in the target cluster to include (at
least) the same records as the source cluster. MirrorMaker does not
translate offsets, so the downstream commits will be wrong if you try to
replicate __consumer_offsets like that.

Re why kafka-consumer-groups is reporting different information, I suspect
that the downstream __consumer_offsets topic does not have the correct
number of partitions. If __consumer_offsets was created by MirrorMaker
during replication, it would have been created with the cluster's default
number of partitions, which is not the same as
offsets.topic.num.partitions. In this case, the semantic partitioning will
be broken, and kafka-consumer-groups (or indeed KafkaConsumer) will be
confused.

Re why kafka-console-consumer is showing slightly different records, I
believe there must be a string serde somewhere in your pipeline. It appears
that the offset record value has been toString'd.

let consumers read from specific time (not offset number). Should it work?

Yes, that is the current best practice, though there are many reasons why
this is less than ideal.

Ryanne


On Tue, Mar 5, 2019 at 10:07 AM Anatoliy Soldatov
<ak...@avito.ru.invalid>> wrote:

Hello, Ryanne and thank you for you answer!

I am using idempotent producers. And you are right, I started replication
after few days and some of source data were already deleted (because of
retention) at that moment.

Still I couldn’t understand logic behind Kafka-consumer-groups. With
console consumer I could see that __consumer_offsets topic on both source
and destination  clusters consists of same data (more or less).
OffsetMetadata and commit time are identical for some randomly picked
events from this topic.

However, Kafka-consumer-groups shows really different output for source
and destination cluster for the same consumer group. As in my previous
letter, it shows something around 560000 current offset + 560000 end offset
on source cluster  and 370000 current offset + 480000 end offset on
destination cluster.  But console consumer shows that on both source and
destination __consumer_offsets topic has events with 560000 offsets for
this group. And Kafka-consumer-groups shows only one partition for the
group on destination cluster. But there are 9 partitions on both clusters.

Also, as far as I know, partitioner class should be different for typical
topics and  __consumer_offsets topic (different hash keys). Is it correct?
If so, how MirrorMaker producer handles it?

I have an idea for failover – let consumers read from specific time (not
offset number). Should it work?

Also, I think MM2 Is a nice idea and waiting for it!

Tolya

5 марта 2019 г., в 18:08, Ryanne Dolan <ry...@gmail.com>>
написал(а):

Tolya,

That is the expected behavior. Offsets are not consistent between
mirrored
clusters.

Kafka allows duplicate records ("at least once"), which means the
downstream offsets will tend to creep higher than those in the source
partitions. For example, if a producer sends a record but doesn't receive
an ACK within a time out, it may resend the same record again. But the
record may have actually been received by the broker, so now the broker
sees the same record twice.

You can use an idempotent producer to prevent duplicates and transactions
for exactly-once replication, but even so, there is no guarantee the
offsets are consistent. For example, a source partition doesn't
necessarily
start at offset zero when you start replicating it.

You are correct that failover will not work as you were expecting. I've
solved this problem in KIP-382 with "MirrorMaker 2.0", which is currently
implemented in a draft PR here:


https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fkafka%2Fpull%2F6295&amp;data=02%7C01%7Caksoldatov%40avito.ru%7C5736c1ed9a3b418234cd08d6a18cd493%7Caf0e07b3b90b472392e63fab11dd5396%7C1%7C0%7C636874023563595939&amp;sdata=h7pxhnNX7HDu9hHRY64y4aoj7AyaZIUavNNDWgbNjPw%3D&amp;reserved=0

MM2 uses a sparse "offset sync" topic to keep track of the mapping
between
upstream and downstream offsets, and emits checkpoints that consumers can
use for failover and failback. This can be automated, e.g. by resetting
consumer offsets based on the latest checkpoint from another cluster. The
tooling has not been released yet, but the logic is in
RemoteClusterUtils.

Ryanne

On Tue, Mar 5, 2019, 5:06 AM Anatoliy Soldatov
<ak...@avito.ru.invalid>>
wrote:

Hello, guys!

I am not sure about offsets replicated by MirrorMaker.

I am replicating data from one Kafka cluster (let's say cluster A,
Confluent Kafka 2.0) to another (cluster B, Confluent Kafka 2.1) with
internal topics.
MirrorMaker lag is somewhere between 1-2k events.

I started replication after some time (some old events were removed
because of retention).
Topics on both clusters have similar number of partitions.
ConsumerGroupCommand on cluster A and cluster B showing different
results
as below.

cluster A (source):
~> kafka-consumer-groups \
--bootstrap-server clusterA:9092 \
--describe \
--group "some_group" | sort

------------------------------
TOPIC                  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
CONSUMER-ID                                 HOST           CLIENT-ID
some_topic             0          560498          560498          0
sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
some_topic             1          560569          560571          2
sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
some_topic             2          560478          560480          2
sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
some_topic             3          560528          560530          2
sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
some_topic             4          560542          560543          1
sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
some_topic             5          560497          560498          1
sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
some_topic             6          560484          560484          0
sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
some_topic             7          560527          560527          0
sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
some_topic             8          560539          560540          1
sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama

cluster B (destination):
~> kafka-consumer-groups \
--bootstrap-server clusterB:9092 \
--describe \
--group "some_group" | sort

------------------------------
Consumer group 'some_group' has no active members.

TOPIC                  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
CONSUMER-ID     HOST            CLIENT-ID
some_topic             0          373323          481950          108627
-               -               -


However, offset metadata in __consumer_offsets topic is the same on both
clusters.

cluster A (source):
~> kafka-console-consumer \
--formatter
"kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
\
--bootstrap-server clusterA:9092 \
--topic __consumer_offsets | grep some_topic

------------------------------

[some_group,some_topic,7]::[OffsetMetadata[561492,NO_METADATA],CommitTime
1551782709369,ExpirationTime 1552387509369]
...

cluster B (destination):
~> kafka-console-consumer \
--formatter
"kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
\
--bootstrap-server clusterB:9092 \
--topic __consumer_offsets | grep some_topic

------------------------------
[some_group,some_topic,7]::OffsetAndMetadata(offset=561492,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1551782709369,
expireTimestamp=Some(1552387509369))
...

Notice, that offsets matches output of ConsumerGroupCommand for cluster
A,
but not for cluster B.

So, there are my questions:
Why ConsumerGroupCommand showing different results on cluster A and
cluster B?
Why consumer lag is so high on cluster B?
Is ConsumerGroupCommand using info not from __consumer_offsets topic?
Will all my consumers stuck (because of incompatible offsets) in case of
failover?

Kind regards,
Tolya


________________________________
"This message contains confidential information/commercial secret. If
you
are not the intended addressee of this message you may not copy, save,
print or forward it to any third party and you are kindly requested to
destroy this message and notify the sender thereof by email.
?????? ????????? ???????? ???????????????? ??????????/??????????,
?????????? ???????????? ??????. ???? ?? ?? ????????? ??????????
?????????
??????? ?????????, ?? ?? ?????? ??????????, ?????????, ???????? ???
?????????? ??? ????? ???? ???? ?????. ??????? ?????????? ??????
????????? ?
????????? ?? ???? ??????????? ??????????? ???????."



________________________________
"This message contains confidential information/commercial secret. If you
are not the intended addressee of this message you may not copy, save,
print or forward it to any third party and you are kindly requested to
destroy this message and notify the sender thereof by email.
Данное сообщение содержит конфиденциальную информацию/информацию,
являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом
данного сообщения, Вы не вправе копировать, сохранять, печатать или
пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и
уведомить об этом отправителя электронным письмом.”


________________________________
"This message contains confidential information/commercial secret. If you are not the intended addressee of this message you may not copy, save, print or forward it to any third party and you are kindly requested to destroy this message and notify the sender thereof by email.
Данное сообщение содержит конфиденциальную информацию/информацию, являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом данного сообщения, Вы не вправе копировать, сохранять, печатать или пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и уведомить об этом отправителя электронным письмом.”

Re: Odd behaviour of MirrorMaker and ConsumerGroupCommand

Posted by Ryanne Dolan <ry...@gmail.com>.
Tolya,

You mentioned that you are replicating "with internal topics", so I'd
expect the __consumer_offsets topic in the target cluster to include (at
least) the same records as the source cluster. MirrorMaker does not
translate offsets, so the downstream commits will be wrong if you try to
replicate __consumer_offsets like that.

Re why kafka-consumer-groups is reporting different information, I suspect
that the downstream __consumer_offsets topic does not have the correct
number of partitions. If __consumer_offsets was created by MirrorMaker
during replication, it would have been created with the cluster's default
number of partitions, which is not the same as
offsets.topic.num.partitions. In this case, the semantic partitioning will
be broken, and kafka-consumer-groups (or indeed KafkaConsumer) will be
confused.

Re why kafka-console-consumer is showing slightly different records, I
believe there must be a string serde somewhere in your pipeline. It appears
that the offset record value has been toString'd.

> let consumers read from specific time (not offset number). Should it work?

Yes, that is the current best practice, though there are many reasons why
this is less than ideal.

Ryanne


On Tue, Mar 5, 2019 at 10:07 AM Anatoliy Soldatov
<ak...@avito.ru.invalid> wrote:

> Hello, Ryanne and thank you for you answer!
>
> I am using idempotent producers. And you are right, I started replication
> after few days and some of source data were already deleted (because of
> retention) at that moment.
>
> Still I couldn’t understand logic behind Kafka-consumer-groups. With
> console consumer I could see that __consumer_offsets topic on both source
> and destination  clusters consists of same data (more or less).
> OffsetMetadata and commit time are identical for some randomly picked
> events from this topic.
>
> However, Kafka-consumer-groups shows really different output for source
> and destination cluster for the same consumer group. As in my previous
> letter, it shows something around 560000 current offset + 560000 end offset
> on source cluster  and 370000 current offset + 480000 end offset on
> destination cluster.  But console consumer shows that on both source and
> destination __consumer_offsets topic has events with 560000 offsets for
> this group. And Kafka-consumer-groups shows only one partition for the
> group on destination cluster. But there are 9 partitions on both clusters.
>
> Also, as far as I know, partitioner class should be different for typical
> topics and  __consumer_offsets topic (different hash keys). Is it correct?
> If so, how MirrorMaker producer handles it?
>
> I have an idea for failover – let consumers read from specific time (not
> offset number). Should it work?
>
> Also, I think MM2 Is a nice idea and waiting for it!
>
> Tolya
>
> > 5 марта 2019 г., в 18:08, Ryanne Dolan <ry...@gmail.com>
> написал(а):
> >
> > Tolya,
> >
> > That is the expected behavior. Offsets are not consistent between
> mirrored
> > clusters.
> >
> > Kafka allows duplicate records ("at least once"), which means the
> > downstream offsets will tend to creep higher than those in the source
> > partitions. For example, if a producer sends a record but doesn't receive
> > an ACK within a time out, it may resend the same record again. But the
> > record may have actually been received by the broker, so now the broker
> > sees the same record twice.
> >
> > You can use an idempotent producer to prevent duplicates and transactions
> > for exactly-once replication, but even so, there is no guarantee the
> > offsets are consistent. For example, a source partition doesn't
> necessarily
> > start at offset zero when you start replicating it.
> >
> > You are correct that failover will not work as you were expecting. I've
> > solved this problem in KIP-382 with "MirrorMaker 2.0", which is currently
> > implemented in a draft PR here:
> >
> >
> https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fkafka%2Fpull%2F6295&amp;data=02%7C01%7Caksoldatov%40avito.ru%7Cf45fab1a7dae481f966708d6a17def52%7Caf0e07b3b90b472392e63fab11dd5396%7C1%7C0%7C636873959598377914&amp;sdata=Tmvj98a%2B%2Br84IFvJ7m78NDVjFh5FVfGQBJ9QEfoRoVw%3D&amp;reserved=0
> >
> > MM2 uses a sparse "offset sync" topic to keep track of the mapping
> between
> > upstream and downstream offsets, and emits checkpoints that consumers can
> > use for failover and failback. This can be automated, e.g. by resetting
> > consumer offsets based on the latest checkpoint from another cluster. The
> > tooling has not been released yet, but the logic is in
> RemoteClusterUtils.
> >
> > Ryanne
> >
> > On Tue, Mar 5, 2019, 5:06 AM Anatoliy Soldatov
> <ak...@avito.ru.invalid>
> > wrote:
> >
> >> Hello, guys!
> >>
> >> I am not sure about offsets replicated by MirrorMaker.
> >>
> >> I am replicating data from one Kafka cluster (let's say cluster A,
> >> Confluent Kafka 2.0) to another (cluster B, Confluent Kafka 2.1) with
> >> internal topics.
> >> MirrorMaker lag is somewhere between 1-2k events.
> >>
> >> I started replication after some time (some old events were removed
> >> because of retention).
> >> Topics on both clusters have similar number of partitions.
> >> ConsumerGroupCommand on cluster A and cluster B showing different
> results
> >> as below.
> >>
> >> cluster A (source):
> >> ~> kafka-consumer-groups \
> >>  --bootstrap-server clusterA:9092 \
> >>  --describe \
> >>  --group "some_group" | sort
> >>
> >> ------------------------------
> >> TOPIC                  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
> >> CONSUMER-ID                                 HOST           CLIENT-ID
> >> some_topic             0          560498          560498          0
> >> sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> >> some_topic             1          560569          560571          2
> >> sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> >> some_topic             2          560478          560480          2
> >> sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> >> some_topic             3          560528          560530          2
> >> sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> >> some_topic             4          560542          560543          1
> >> sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> >> some_topic             5          560497          560498          1
> >> sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> >> some_topic             6          560484          560484          0
> >> sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> >> some_topic             7          560527          560527          0
> >> sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> >> some_topic             8          560539          560540          1
> >> sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> >>
> >> cluster B (destination):
> >> ~> kafka-consumer-groups \
> >>  --bootstrap-server clusterB:9092 \
> >>  --describe \
> >>  --group "some_group" | sort
> >>
> >> ------------------------------
> >> Consumer group 'some_group' has no active members.
> >>
> >> TOPIC                  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
> >> CONSUMER-ID     HOST            CLIENT-ID
> >> some_topic             0          373323          481950          108627
> >> -               -               -
> >>
> >>
> >> However, offset metadata in __consumer_offsets topic is the same on both
> >> clusters.
> >>
> >> cluster A (source):
> >> ~> kafka-console-consumer \
> >> --formatter
> >> "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
> \
> >> --bootstrap-server clusterA:9092 \
> >> --topic __consumer_offsets | grep some_topic
> >>
> >> ------------------------------
> >>
> [some_group,some_topic,7]::[OffsetMetadata[561492,NO_METADATA],CommitTime
> >> 1551782709369,ExpirationTime 1552387509369]
> >> ...
> >>
> >> cluster B (destination):
> >> ~> kafka-console-consumer \
> >> --formatter
> >> "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
> \
> >> --bootstrap-server clusterB:9092 \
> >> --topic __consumer_offsets | grep some_topic
> >>
> >> ------------------------------
> >> [some_group,some_topic,7]::OffsetAndMetadata(offset=561492,
> >> leaderEpoch=Optional.empty, metadata=, commitTimestamp=1551782709369,
> >> expireTimestamp=Some(1552387509369))
> >> ...
> >>
> >> Notice, that offsets matches output of ConsumerGroupCommand for cluster
> A,
> >> but not for cluster B.
> >>
> >> So, there are my questions:
> >> Why ConsumerGroupCommand showing different results on cluster A and
> >> cluster B?
> >> Why consumer lag is so high on cluster B?
> >> Is ConsumerGroupCommand using info not from __consumer_offsets topic?
> >> Will all my consumers stuck (because of incompatible offsets) in case of
> >> failover?
> >>
> >> Kind regards,
> >> Tolya
> >>
> >>
> >> ________________________________
> >> "This message contains confidential information/commercial secret. If
> you
> >> are not the intended addressee of this message you may not copy, save,
> >> print or forward it to any third party and you are kindly requested to
> >> destroy this message and notify the sender thereof by email.
> >> ?????? ????????? ???????? ???????????????? ??????????/??????????,
> >> ?????????? ???????????? ??????. ???? ?? ?? ????????? ??????????
> ?????????
> >> ??????? ?????????, ?? ?? ?????? ??????????, ?????????, ???????? ???
> >> ?????????? ??? ????? ???? ???? ?????. ??????? ?????????? ??????
> ????????? ?
> >> ????????? ?? ???? ??????????? ??????????? ???????."
> >>
>
>
> ________________________________
> "This message contains confidential information/commercial secret. If you
> are not the intended addressee of this message you may not copy, save,
> print or forward it to any third party and you are kindly requested to
> destroy this message and notify the sender thereof by email.
> Данное сообщение содержит конфиденциальную информацию/информацию,
> являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом
> данного сообщения, Вы не вправе копировать, сохранять, печатать или
> пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и
> уведомить об этом отправителя электронным письмом.”
>

Re: Odd behaviour of MirrorMaker and ConsumerGroupCommand

Posted by Anatoliy Soldatov <ak...@avito.ru.INVALID>.
Hello, Ryanne and thank you for you answer!

I am using idempotent producers. And you are right, I started replication after few days and some of source data were already deleted (because of retention) at that moment.

Still I couldn’t understand logic behind Kafka-consumer-groups. With console consumer I could see that __consumer_offsets topic on both source and destination  clusters consists of same data (more or less). OffsetMetadata and commit time are identical for some randomly picked events from this topic.

However, Kafka-consumer-groups shows really different output for source and destination cluster for the same consumer group. As in my previous letter, it shows something around 560000 current offset + 560000 end offset on source cluster  and 370000 current offset + 480000 end offset on destination cluster.  But console consumer shows that on both source and destination __consumer_offsets topic has events with 560000 offsets for this group. And Kafka-consumer-groups shows only one partition for the group on destination cluster. But there are 9 partitions on both clusters.

Also, as far as I know, partitioner class should be different for typical topics and  __consumer_offsets topic (different hash keys). Is it correct? If so, how MirrorMaker producer handles it?

I have an idea for failover – let consumers read from specific time (not offset number). Should it work?

Also, I think MM2 Is a nice idea and waiting for it!

Tolya

> 5 марта 2019 г., в 18:08, Ryanne Dolan <ry...@gmail.com> написал(а):
>
> Tolya,
>
> That is the expected behavior. Offsets are not consistent between mirrored
> clusters.
>
> Kafka allows duplicate records ("at least once"), which means the
> downstream offsets will tend to creep higher than those in the source
> partitions. For example, if a producer sends a record but doesn't receive
> an ACK within a time out, it may resend the same record again. But the
> record may have actually been received by the broker, so now the broker
> sees the same record twice.
>
> You can use an idempotent producer to prevent duplicates and transactions
> for exactly-once replication, but even so, there is no guarantee the
> offsets are consistent. For example, a source partition doesn't necessarily
> start at offset zero when you start replicating it.
>
> You are correct that failover will not work as you were expecting. I've
> solved this problem in KIP-382 with "MirrorMaker 2.0", which is currently
> implemented in a draft PR here:
>
> https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fkafka%2Fpull%2F6295&amp;data=02%7C01%7Caksoldatov%40avito.ru%7Cf45fab1a7dae481f966708d6a17def52%7Caf0e07b3b90b472392e63fab11dd5396%7C1%7C0%7C636873959598377914&amp;sdata=Tmvj98a%2B%2Br84IFvJ7m78NDVjFh5FVfGQBJ9QEfoRoVw%3D&amp;reserved=0
>
> MM2 uses a sparse "offset sync" topic to keep track of the mapping between
> upstream and downstream offsets, and emits checkpoints that consumers can
> use for failover and failback. This can be automated, e.g. by resetting
> consumer offsets based on the latest checkpoint from another cluster. The
> tooling has not been released yet, but the logic is in RemoteClusterUtils.
>
> Ryanne
>
> On Tue, Mar 5, 2019, 5:06 AM Anatoliy Soldatov <ak...@avito.ru.invalid>
> wrote:
>
>> Hello, guys!
>>
>> I am not sure about offsets replicated by MirrorMaker.
>>
>> I am replicating data from one Kafka cluster (let's say cluster A,
>> Confluent Kafka 2.0) to another (cluster B, Confluent Kafka 2.1) with
>> internal topics.
>> MirrorMaker lag is somewhere between 1-2k events.
>>
>> I started replication after some time (some old events were removed
>> because of retention).
>> Topics on both clusters have similar number of partitions.
>> ConsumerGroupCommand on cluster A and cluster B showing different results
>> as below.
>>
>> cluster A (source):
>> ~> kafka-consumer-groups \
>>  --bootstrap-server clusterA:9092 \
>>  --describe \
>>  --group "some_group" | sort
>>
>> ------------------------------
>> TOPIC                  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
>> CONSUMER-ID                                 HOST           CLIENT-ID
>> some_topic             0          560498          560498          0
>> sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
>> some_topic             1          560569          560571          2
>> sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
>> some_topic             2          560478          560480          2
>> sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
>> some_topic             3          560528          560530          2
>> sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
>> some_topic             4          560542          560543          1
>> sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
>> some_topic             5          560497          560498          1
>> sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
>> some_topic             6          560484          560484          0
>> sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
>> some_topic             7          560527          560527          0
>> sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
>> some_topic             8          560539          560540          1
>> sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
>>
>> cluster B (destination):
>> ~> kafka-consumer-groups \
>>  --bootstrap-server clusterB:9092 \
>>  --describe \
>>  --group "some_group" | sort
>>
>> ------------------------------
>> Consumer group 'some_group' has no active members.
>>
>> TOPIC                  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
>> CONSUMER-ID     HOST            CLIENT-ID
>> some_topic             0          373323          481950          108627
>> -               -               -
>>
>>
>> However, offset metadata in __consumer_offsets topic is the same on both
>> clusters.
>>
>> cluster A (source):
>> ~> kafka-console-consumer \
>> --formatter
>> "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
>> --bootstrap-server clusterA:9092 \
>> --topic __consumer_offsets | grep some_topic
>>
>> ------------------------------
>> [some_group,some_topic,7]::[OffsetMetadata[561492,NO_METADATA],CommitTime
>> 1551782709369,ExpirationTime 1552387509369]
>> ...
>>
>> cluster B (destination):
>> ~> kafka-console-consumer \
>> --formatter
>> "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
>> --bootstrap-server clusterB:9092 \
>> --topic __consumer_offsets | grep some_topic
>>
>> ------------------------------
>> [some_group,some_topic,7]::OffsetAndMetadata(offset=561492,
>> leaderEpoch=Optional.empty, metadata=, commitTimestamp=1551782709369,
>> expireTimestamp=Some(1552387509369))
>> ...
>>
>> Notice, that offsets matches output of ConsumerGroupCommand for cluster A,
>> but not for cluster B.
>>
>> So, there are my questions:
>> Why ConsumerGroupCommand showing different results on cluster A and
>> cluster B?
>> Why consumer lag is so high on cluster B?
>> Is ConsumerGroupCommand using info not from __consumer_offsets topic?
>> Will all my consumers stuck (because of incompatible offsets) in case of
>> failover?
>>
>> Kind regards,
>> Tolya
>>
>>
>> ________________________________
>> "This message contains confidential information/commercial secret. If you
>> are not the intended addressee of this message you may not copy, save,
>> print or forward it to any third party and you are kindly requested to
>> destroy this message and notify the sender thereof by email.
>> ?????? ????????? ???????? ???????????????? ??????????/??????????,
>> ?????????? ???????????? ??????. ???? ?? ?? ????????? ?????????? ?????????
>> ??????? ?????????, ?? ?? ?????? ??????????, ?????????, ???????? ???
>> ?????????? ??? ????? ???? ???? ?????. ??????? ?????????? ?????? ????????? ?
>> ????????? ?? ???? ??????????? ??????????? ???????."
>>


________________________________
"This message contains confidential information/commercial secret. If you are not the intended addressee of this message you may not copy, save, print or forward it to any third party and you are kindly requested to destroy this message and notify the sender thereof by email.
Данное сообщение содержит конфиденциальную информацию/информацию, являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом данного сообщения, Вы не вправе копировать, сохранять, печатать или пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и уведомить об этом отправителя электронным письмом.”

Re: Odd behaviour of MirrorMaker and ConsumerGroupCommand

Posted by Ryanne Dolan <ry...@gmail.com>.
Tolya,

That is the expected behavior. Offsets are not consistent between mirrored
clusters.

Kafka allows duplicate records ("at least once"), which means the
downstream offsets will tend to creep higher than those in the source
partitions. For example, if a producer sends a record but doesn't receive
an ACK within a time out, it may resend the same record again. But the
record may have actually been received by the broker, so now the broker
sees the same record twice.

You can use an idempotent producer to prevent duplicates and transactions
for exactly-once replication, but even so, there is no guarantee the
offsets are consistent. For example, a source partition doesn't necessarily
start at offset zero when you start replicating it.

You are correct that failover will not work as you were expecting. I've
solved this problem in KIP-382 with "MirrorMaker 2.0", which is currently
implemented in a draft PR here:

https://github.com/apache/kafka/pull/6295

MM2 uses a sparse "offset sync" topic to keep track of the mapping between
upstream and downstream offsets, and emits checkpoints that consumers can
use for failover and failback. This can be automated, e.g. by resetting
consumer offsets based on the latest checkpoint from another cluster. The
tooling has not been released yet, but the logic is in RemoteClusterUtils.

Ryanne

On Tue, Mar 5, 2019, 5:06 AM Anatoliy Soldatov <ak...@avito.ru.invalid>
wrote:

> Hello, guys!
>
> I am not sure about offsets replicated by MirrorMaker.
>
> I am replicating data from one Kafka cluster (let's say cluster A,
> Confluent Kafka 2.0) to another (cluster B, Confluent Kafka 2.1) with
> internal topics.
> MirrorMaker lag is somewhere between 1-2k events.
>
> I started replication after some time (some old events were removed
> because of retention).
> Topics on both clusters have similar number of partitions.
> ConsumerGroupCommand on cluster A and cluster B showing different results
> as below.
>
> cluster A (source):
> ~> kafka-consumer-groups \
>   --bootstrap-server clusterA:9092 \
>   --describe \
>   --group "some_group" | sort
>
> ------------------------------
> TOPIC                  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
>  CONSUMER-ID                                 HOST           CLIENT-ID
> some_topic             0          560498          560498          0
>  sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> some_topic             1          560569          560571          2
>  sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> some_topic             2          560478          560480          2
>  sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> some_topic             3          560528          560530          2
>  sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> some_topic             4          560542          560543          1
>  sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> some_topic             5          560497          560498          1
>  sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> some_topic             6          560484          560484          0
>  sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> some_topic             7          560527          560527          0
>  sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> some_topic             8          560539          560540          1
>  sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
>
> cluster B (destination):
> ~> kafka-consumer-groups \
>   --bootstrap-server clusterB:9092 \
>   --describe \
>   --group "some_group" | sort
>
> ------------------------------
> Consumer group 'some_group' has no active members.
>
> TOPIC                  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
> CONSUMER-ID     HOST            CLIENT-ID
> some_topic             0          373323          481950          108627
>  -               -               -
>
>
> However, offset metadata in __consumer_offsets topic is the same on both
> clusters.
>
> cluster A (source):
> ~> kafka-console-consumer \
> --formatter
> "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
> --bootstrap-server clusterA:9092 \
> --topic __consumer_offsets | grep some_topic
>
> ------------------------------
> [some_group,some_topic,7]::[OffsetMetadata[561492,NO_METADATA],CommitTime
> 1551782709369,ExpirationTime 1552387509369]
> ...
>
> cluster B (destination):
> ~> kafka-console-consumer \
> --formatter
> "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
> --bootstrap-server clusterB:9092 \
> --topic __consumer_offsets | grep some_topic
>
> ------------------------------
> [some_group,some_topic,7]::OffsetAndMetadata(offset=561492,
> leaderEpoch=Optional.empty, metadata=, commitTimestamp=1551782709369,
> expireTimestamp=Some(1552387509369))
> ...
>
> Notice, that offsets matches output of ConsumerGroupCommand for cluster A,
> but not for cluster B.
>
> So, there are my questions:
> Why ConsumerGroupCommand showing different results on cluster A and
> cluster B?
> Why consumer lag is so high on cluster B?
> Is ConsumerGroupCommand using info not from __consumer_offsets topic?
> Will all my consumers stuck (because of incompatible offsets) in case of
> failover?
>
> Kind regards,
> Tolya
>
>
> ________________________________
> "This message contains confidential information/commercial secret. If you
> are not the intended addressee of this message you may not copy, save,
> print or forward it to any third party and you are kindly requested to
> destroy this message and notify the sender thereof by email.
> ?????? ????????? ???????? ???????????????? ??????????/??????????,
> ?????????? ???????????? ??????. ???? ?? ?? ????????? ?????????? ?????????
> ??????? ?????????, ?? ?? ?????? ??????????, ?????????, ???????? ???
> ?????????? ??? ????? ???? ???? ?????. ??????? ?????????? ?????? ????????? ?
> ????????? ?? ???? ??????????? ??????????? ???????."
>