You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sunny Lohani <su...@gmail.com> on 2020/08/04 14:28:05 UTC

Kafka Mirror Maker 2: RemoteClusterUtils.translateOffsets() returning empty map

Hi,

I have 2 data centers, each having single node Zookeeper and Kafka cluster.
I have a topic (single partition) in both the data center kafka clusters. I
am using MM 2.0 as a dedicated cluster for bi-directional replication of
the topic as well as using RemoteClusterUtils.translateOffsets() in my
application for offset translation during failover. But the method is
returning an empty map due to which the consumer is not resuming from
proper offsets for local/remote topics.

When I investigated further, I found that the checkpoint
topics A.checkpoints.internal and B.checkpoints.internal in respective
clusters do not have any kafka message. I don't see any errors in the
mirror maker console logs. I searched everywhere on the internet but could
not get any help. Below is the mm.properties:

clusters = A, B
A.bootstrap.servers = 10.34.45.113:19092
B.bootstrap.servers = 10.34.45.113:29092

A->B.enabled = true
A->B.topics = .*
B->A.enabled = true
B->A.topics = .*

# Setting replication factor of newly created remote topics
replication.factor=1

checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1

offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1

sync.topic.acls.enabled = false

emit.checkpoints.enabled = true
emit.checkpoints.interval.seconds = 5
----

Need help on this urgently. Thanks in advance.

Thanks & Regards,
Sunny Kumar Lohani,

Re: Kafka Mirror Maker 2: RemoteClusterUtils.translateOffsets() returning empty map

Posted by Sunny Lohani <su...@gmail.com>.
Hello Kafka community,

I struggled with this issue for a long time and finally figured out the
reason.

*Issue*: MM2 not emitting checkpoints for my consumer, which is a Apache
Flink job with Kafka source connector. Due to this, I’m not able to do the
failover. Bi-directional cross-DC replication is working fine.

*Reason: *Flink kafka connector internally uses assign() API for the
consumer. This is the reason mm2 is not emitting checkpoints for my
consumer group. When I tested mm2 with a simple consumer using subscribe()
API, it emits checkpoints and I’m able to do the failover and offset
translation using RemoteClusterUtils class.

Can someone please tell me if it’s a bug in mm2 or it is an expected
behaviour ? I already see few mails earlier asking on this same issue
related to assign() API but they are unanswered. It’ll be great if someone
can please explain this.

Thanks & Regards

On Wed, 5 Aug 2020 at 1:57 AM, Sunny Lohani <su...@gmail.com> wrote:

> Hi Ryanne,
>
> I tried setting the groups in mm2.properties, still no luck. And, the
> consumer application is not kafka-console-consumer, so no issues there.
> I noticed in the MM logs that MirrorCheckpointConnector is created but
> without any connector task, not even a single error in the logs though. I
> feel this is the reason that the checkpoints topics are empty. Below are
> some log traces which indicate the same:
>
> [2020-08-05 01:38:12,349] INFO Started MirrorCheckpointConnector with 0
> consumer groups.
> (org.apache.kafka.connect.mirror.MirrorCheckpointConnector:79)
> [2020-08-05 01:38:12,350] INFO Finished creating connector
> MirrorCheckpointConnector (org.apache.kafka.connect.runtime.Worker:273)
> ...
> [2020-08-05 01:46:12,145] INFO [Worker clientId=connect-1, groupId=B-mm2]
> Joined group at generation 8 with protocol version 2 and got assignment:
> Assignment{error=0,
> leader='connect-1-6529239d-299d-4fc9-aea7-80c94f0838ab',
> leaderUrl='NOTUSED/B', offset=20, connectorIds=[MirrorSourceConnector,
> MirrorHeartbeatConnector, MirrorCheckpointConnector],
> taskIds=[MirrorSourceConnector-0, MirrorHeartbeatConnector-0],
> revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1549)
>
> Let me know your thoughts as to why the MirrorCheckpointConnector task is
> not getting created ?
>
> Also, just fyi, I am using KafkaCat to produce messages into the local
> topics of the kafka clusters. I hope this should not create any issues.
>
> --
> Sunny
>
>
> On Tue, Aug 4, 2020 at 9:17 PM Ryanne Dolan <ry...@gmail.com> wrote:
>
>> Sunny, check the groups and groups.blacklist properties. By default, MM2
>> won't replicate consumer groups from kafka-console-consumer, for example.
>> Sometimes that confuses people when testing MM2.
>>
>> Also check the logs to see if there is any reason
>> MirrorCheckpointConnector
>> might be failing to start.
>>
>> Ryanne
>>
>> On Tue, Aug 4, 2020 at 10:27 AM Sunny Lohani <su...@gmail.com>
>> wrote:
>>
>> > Hi Ryanne,
>> >
>> > First of all, thanks for a quick revert. Actually, I have a consumer
>> > application consuming messages in cluster A and then I failover the
>> > consumer from A to cluster B. Also, the consumer is subscribed to both
>> > local and remote topics. Let's say the local topic name in both the
>> > clusters A and B is test-topic. Here is a sequence of steps that I am
>> > following before and after the failover:
>> >
>> > 1. Consumer application subscribes to topics test-topic and
>> B.test-topic in
>> > cluster A with group.id "test-consumer"
>> > 2. It consumes some messages from both the topics.
>> > 3. Now, we stop the consumer application and restart it pointing to
>> cluster
>> > B, subscribed to topics test-topic and A.test-topic, with group.id
>> > "test-consumer". I use the RemoteClusterUtils.translateOffsets() here.
>> > 4. The method returns an empty map as well as the checkpoints topic in
>> both
>> > the clusters is empty.
>> >
>> > Let me know if you see anything wrong here.
>> >
>> > Thanks,
>> > Sunny
>> >
>> >
>> > On Tue, Aug 4, 2020 at 8:26 PM Ryanne Dolan <ry...@gmail.com>
>> wrote:
>> >
>> > > Sunny, is it possible there are no consumer groups? There will be no
>> > > checkpoints, and thus nothing to use for offset translation, if there
>> are
>> > > no upstream consumer groups.
>> > >
>> > > Ryanne
>> > >
>> > > On Tue, Aug 4, 2020, 9:28 AM Sunny Lohani <su...@gmail.com>
>> > wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > > I have 2 data centers, each having single node Zookeeper and Kafka
>> > > cluster.
>> > > > I have a topic (single partition) in both the data center kafka
>> > > clusters. I
>> > > > am using MM 2.0 as a dedicated cluster for bi-directional
>> replication
>> > of
>> > > > the topic as well as using RemoteClusterUtils.translateOffsets() in
>> my
>> > > > application for offset translation during failover. But the method
>> is
>> > > > returning an empty map due to which the consumer is not resuming
>> from
>> > > > proper offsets for local/remote topics.
>> > > >
>> > > > When I investigated further, I found that the checkpoint
>> > > > topics A.checkpoints.internal and B.checkpoints.internal in
>> respective
>> > > > clusters do not have any kafka message. I don't see any errors in
>> the
>> > > > mirror maker console logs. I searched everywhere on the internet but
>> > > could
>> > > > not get any help. Below is the mm.properties:
>> > > >
>> > > > clusters = A, B
>> > > > A.bootstrap.servers = 10.34.45.113:19092
>> > > > B.bootstrap.servers = 10.34.45.113:29092
>> > > >
>> > > > A->B.enabled = true
>> > > > A->B.topics = .*
>> > > > B->A.enabled = true
>> > > > B->A.topics = .*
>> > > >
>> > > > # Setting replication factor of newly created remote topics
>> > > > replication.factor=1
>> > > >
>> > > > checkpoints.topic.replication.factor=1
>> > > > heartbeats.topic.replication.factor=1
>> > > > offset-syncs.topic.replication.factor=1
>> > > >
>> > > > offset.storage.replication.factor=1
>> > > > status.storage.replication.factor=1
>> > > > config.storage.replication.factor=1
>> > > >
>> > > > sync.topic.acls.enabled = false
>> > > >
>> > > > emit.checkpoints.enabled = true
>> > > > emit.checkpoints.interval.seconds = 5
>> > > > ----
>> > > >
>> > > > Need help on this urgently. Thanks in advance.
>> > > >
>> > > > Thanks & Regards,
>> > > > Sunny Kumar Lohani,
>> > > >
>> > >
>> >
>>
> --
Sunny Kumar Lohani,
Sr. SW Engineer,
Visa Inc.,
Bangalore (India)

Re: Kafka Mirror Maker 2: RemoteClusterUtils.translateOffsets() returning empty map

Posted by Sunny Lohani <su...@gmail.com>.
Hi Ryanne,

I tried setting the groups in mm2.properties, still no luck. And, the
consumer application is not kafka-console-consumer, so no issues there.
I noticed in the MM logs that MirrorCheckpointConnector is created but
without any connector task, not even a single error in the logs though. I
feel this is the reason that the checkpoints topics are empty. Below are
some log traces which indicate the same:

[2020-08-05 01:38:12,349] INFO Started MirrorCheckpointConnector with 0
consumer groups.
(org.apache.kafka.connect.mirror.MirrorCheckpointConnector:79)
[2020-08-05 01:38:12,350] INFO Finished creating connector
MirrorCheckpointConnector (org.apache.kafka.connect.runtime.Worker:273)
...
[2020-08-05 01:46:12,145] INFO [Worker clientId=connect-1, groupId=B-mm2]
Joined group at generation 8 with protocol version 2 and got assignment:
Assignment{error=0,
leader='connect-1-6529239d-299d-4fc9-aea7-80c94f0838ab',
leaderUrl='NOTUSED/B', offset=20, connectorIds=[MirrorSourceConnector,
MirrorHeartbeatConnector, MirrorCheckpointConnector],
taskIds=[MirrorSourceConnector-0, MirrorHeartbeatConnector-0],
revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1549)

Let me know your thoughts as to why the MirrorCheckpointConnector task is
not getting created ?

Also, just fyi, I am using KafkaCat to produce messages into the local
topics of the kafka clusters. I hope this should not create any issues.

--
Sunny


On Tue, Aug 4, 2020 at 9:17 PM Ryanne Dolan <ry...@gmail.com> wrote:

> Sunny, check the groups and groups.blacklist properties. By default, MM2
> won't replicate consumer groups from kafka-console-consumer, for example.
> Sometimes that confuses people when testing MM2.
>
> Also check the logs to see if there is any reason MirrorCheckpointConnector
> might be failing to start.
>
> Ryanne
>
> On Tue, Aug 4, 2020 at 10:27 AM Sunny Lohani <su...@gmail.com>
> wrote:
>
> > Hi Ryanne,
> >
> > First of all, thanks for a quick revert. Actually, I have a consumer
> > application consuming messages in cluster A and then I failover the
> > consumer from A to cluster B. Also, the consumer is subscribed to both
> > local and remote topics. Let's say the local topic name in both the
> > clusters A and B is test-topic. Here is a sequence of steps that I am
> > following before and after the failover:
> >
> > 1. Consumer application subscribes to topics test-topic and B.test-topic
> in
> > cluster A with group.id "test-consumer"
> > 2. It consumes some messages from both the topics.
> > 3. Now, we stop the consumer application and restart it pointing to
> cluster
> > B, subscribed to topics test-topic and A.test-topic, with group.id
> > "test-consumer". I use the RemoteClusterUtils.translateOffsets() here.
> > 4. The method returns an empty map as well as the checkpoints topic in
> both
> > the clusters is empty.
> >
> > Let me know if you see anything wrong here.
> >
> > Thanks,
> > Sunny
> >
> >
> > On Tue, Aug 4, 2020 at 8:26 PM Ryanne Dolan <ry...@gmail.com>
> wrote:
> >
> > > Sunny, is it possible there are no consumer groups? There will be no
> > > checkpoints, and thus nothing to use for offset translation, if there
> are
> > > no upstream consumer groups.
> > >
> > > Ryanne
> > >
> > > On Tue, Aug 4, 2020, 9:28 AM Sunny Lohani <su...@gmail.com>
> > wrote:
> > >
> > > > Hi,
> > > >
> > > > I have 2 data centers, each having single node Zookeeper and Kafka
> > > cluster.
> > > > I have a topic (single partition) in both the data center kafka
> > > clusters. I
> > > > am using MM 2.0 as a dedicated cluster for bi-directional replication
> > of
> > > > the topic as well as using RemoteClusterUtils.translateOffsets() in
> my
> > > > application for offset translation during failover. But the method is
> > > > returning an empty map due to which the consumer is not resuming from
> > > > proper offsets for local/remote topics.
> > > >
> > > > When I investigated further, I found that the checkpoint
> > > > topics A.checkpoints.internal and B.checkpoints.internal in
> respective
> > > > clusters do not have any kafka message. I don't see any errors in the
> > > > mirror maker console logs. I searched everywhere on the internet but
> > > could
> > > > not get any help. Below is the mm.properties:
> > > >
> > > > clusters = A, B
> > > > A.bootstrap.servers = 10.34.45.113:19092
> > > > B.bootstrap.servers = 10.34.45.113:29092
> > > >
> > > > A->B.enabled = true
> > > > A->B.topics = .*
> > > > B->A.enabled = true
> > > > B->A.topics = .*
> > > >
> > > > # Setting replication factor of newly created remote topics
> > > > replication.factor=1
> > > >
> > > > checkpoints.topic.replication.factor=1
> > > > heartbeats.topic.replication.factor=1
> > > > offset-syncs.topic.replication.factor=1
> > > >
> > > > offset.storage.replication.factor=1
> > > > status.storage.replication.factor=1
> > > > config.storage.replication.factor=1
> > > >
> > > > sync.topic.acls.enabled = false
> > > >
> > > > emit.checkpoints.enabled = true
> > > > emit.checkpoints.interval.seconds = 5
> > > > ----
> > > >
> > > > Need help on this urgently. Thanks in advance.
> > > >
> > > > Thanks & Regards,
> > > > Sunny Kumar Lohani,
> > > >
> > >
> >
>

Re: Kafka Mirror Maker 2: RemoteClusterUtils.translateOffsets() returning empty map

Posted by Ryanne Dolan <ry...@gmail.com>.
Sunny, check the groups and groups.blacklist properties. By default, MM2
won't replicate consumer groups from kafka-console-consumer, for example.
Sometimes that confuses people when testing MM2.

Also check the logs to see if there is any reason MirrorCheckpointConnector
might be failing to start.

Ryanne

On Tue, Aug 4, 2020 at 10:27 AM Sunny Lohani <su...@gmail.com> wrote:

> Hi Ryanne,
>
> First of all, thanks for a quick revert. Actually, I have a consumer
> application consuming messages in cluster A and then I failover the
> consumer from A to cluster B. Also, the consumer is subscribed to both
> local and remote topics. Let's say the local topic name in both the
> clusters A and B is test-topic. Here is a sequence of steps that I am
> following before and after the failover:
>
> 1. Consumer application subscribes to topics test-topic and B.test-topic in
> cluster A with group.id "test-consumer"
> 2. It consumes some messages from both the topics.
> 3. Now, we stop the consumer application and restart it pointing to cluster
> B, subscribed to topics test-topic and A.test-topic, with group.id
> "test-consumer". I use the RemoteClusterUtils.translateOffsets() here.
> 4. The method returns an empty map as well as the checkpoints topic in both
> the clusters is empty.
>
> Let me know if you see anything wrong here.
>
> Thanks,
> Sunny
>
>
> On Tue, Aug 4, 2020 at 8:26 PM Ryanne Dolan <ry...@gmail.com> wrote:
>
> > Sunny, is it possible there are no consumer groups? There will be no
> > checkpoints, and thus nothing to use for offset translation, if there are
> > no upstream consumer groups.
> >
> > Ryanne
> >
> > On Tue, Aug 4, 2020, 9:28 AM Sunny Lohani <su...@gmail.com>
> wrote:
> >
> > > Hi,
> > >
> > > I have 2 data centers, each having single node Zookeeper and Kafka
> > cluster.
> > > I have a topic (single partition) in both the data center kafka
> > clusters. I
> > > am using MM 2.0 as a dedicated cluster for bi-directional replication
> of
> > > the topic as well as using RemoteClusterUtils.translateOffsets() in my
> > > application for offset translation during failover. But the method is
> > > returning an empty map due to which the consumer is not resuming from
> > > proper offsets for local/remote topics.
> > >
> > > When I investigated further, I found that the checkpoint
> > > topics A.checkpoints.internal and B.checkpoints.internal in respective
> > > clusters do not have any kafka message. I don't see any errors in the
> > > mirror maker console logs. I searched everywhere on the internet but
> > could
> > > not get any help. Below is the mm.properties:
> > >
> > > clusters = A, B
> > > A.bootstrap.servers = 10.34.45.113:19092
> > > B.bootstrap.servers = 10.34.45.113:29092
> > >
> > > A->B.enabled = true
> > > A->B.topics = .*
> > > B->A.enabled = true
> > > B->A.topics = .*
> > >
> > > # Setting replication factor of newly created remote topics
> > > replication.factor=1
> > >
> > > checkpoints.topic.replication.factor=1
> > > heartbeats.topic.replication.factor=1
> > > offset-syncs.topic.replication.factor=1
> > >
> > > offset.storage.replication.factor=1
> > > status.storage.replication.factor=1
> > > config.storage.replication.factor=1
> > >
> > > sync.topic.acls.enabled = false
> > >
> > > emit.checkpoints.enabled = true
> > > emit.checkpoints.interval.seconds = 5
> > > ----
> > >
> > > Need help on this urgently. Thanks in advance.
> > >
> > > Thanks & Regards,
> > > Sunny Kumar Lohani,
> > >
> >
>

Re: Kafka Mirror Maker 2: RemoteClusterUtils.translateOffsets() returning empty map

Posted by Sunny Lohani <su...@gmail.com>.
Hi Ryanne,

First of all, thanks for a quick revert. Actually, I have a consumer
application consuming messages in cluster A and then I failover the
consumer from A to cluster B. Also, the consumer is subscribed to both
local and remote topics. Let's say the local topic name in both the
clusters A and B is test-topic. Here is a sequence of steps that I am
following before and after the failover:

1. Consumer application subscribes to topics test-topic and B.test-topic in
cluster A with group.id "test-consumer"
2. It consumes some messages from both the topics.
3. Now, we stop the consumer application and restart it pointing to cluster
B, subscribed to topics test-topic and A.test-topic, with group.id
"test-consumer". I use the RemoteClusterUtils.translateOffsets() here.
4. The method returns an empty map as well as the checkpoints topic in both
the clusters is empty.

Let me know if you see anything wrong here.

Thanks,
Sunny


On Tue, Aug 4, 2020 at 8:26 PM Ryanne Dolan <ry...@gmail.com> wrote:

> Sunny, is it possible there are no consumer groups? There will be no
> checkpoints, and thus nothing to use for offset translation, if there are
> no upstream consumer groups.
>
> Ryanne
>
> On Tue, Aug 4, 2020, 9:28 AM Sunny Lohani <su...@gmail.com> wrote:
>
> > Hi,
> >
> > I have 2 data centers, each having single node Zookeeper and Kafka
> cluster.
> > I have a topic (single partition) in both the data center kafka
> clusters. I
> > am using MM 2.0 as a dedicated cluster for bi-directional replication of
> > the topic as well as using RemoteClusterUtils.translateOffsets() in my
> > application for offset translation during failover. But the method is
> > returning an empty map due to which the consumer is not resuming from
> > proper offsets for local/remote topics.
> >
> > When I investigated further, I found that the checkpoint
> > topics A.checkpoints.internal and B.checkpoints.internal in respective
> > clusters do not have any kafka message. I don't see any errors in the
> > mirror maker console logs. I searched everywhere on the internet but
> could
> > not get any help. Below is the mm.properties:
> >
> > clusters = A, B
> > A.bootstrap.servers = 10.34.45.113:19092
> > B.bootstrap.servers = 10.34.45.113:29092
> >
> > A->B.enabled = true
> > A->B.topics = .*
> > B->A.enabled = true
> > B->A.topics = .*
> >
> > # Setting replication factor of newly created remote topics
> > replication.factor=1
> >
> > checkpoints.topic.replication.factor=1
> > heartbeats.topic.replication.factor=1
> > offset-syncs.topic.replication.factor=1
> >
> > offset.storage.replication.factor=1
> > status.storage.replication.factor=1
> > config.storage.replication.factor=1
> >
> > sync.topic.acls.enabled = false
> >
> > emit.checkpoints.enabled = true
> > emit.checkpoints.interval.seconds = 5
> > ----
> >
> > Need help on this urgently. Thanks in advance.
> >
> > Thanks & Regards,
> > Sunny Kumar Lohani,
> >
>

Re: Kafka Mirror Maker 2: RemoteClusterUtils.translateOffsets() returning empty map

Posted by Ryanne Dolan <ry...@gmail.com>.
Sunny, is it possible there are no consumer groups? There will be no
checkpoints, and thus nothing to use for offset translation, if there are
no upstream consumer groups.

Ryanne

On Tue, Aug 4, 2020, 9:28 AM Sunny Lohani <su...@gmail.com> wrote:

> Hi,
>
> I have 2 data centers, each having single node Zookeeper and Kafka cluster.
> I have a topic (single partition) in both the data center kafka clusters. I
> am using MM 2.0 as a dedicated cluster for bi-directional replication of
> the topic as well as using RemoteClusterUtils.translateOffsets() in my
> application for offset translation during failover. But the method is
> returning an empty map due to which the consumer is not resuming from
> proper offsets for local/remote topics.
>
> When I investigated further, I found that the checkpoint
> topics A.checkpoints.internal and B.checkpoints.internal in respective
> clusters do not have any kafka message. I don't see any errors in the
> mirror maker console logs. I searched everywhere on the internet but could
> not get any help. Below is the mm.properties:
>
> clusters = A, B
> A.bootstrap.servers = 10.34.45.113:19092
> B.bootstrap.servers = 10.34.45.113:29092
>
> A->B.enabled = true
> A->B.topics = .*
> B->A.enabled = true
> B->A.topics = .*
>
> # Setting replication factor of newly created remote topics
> replication.factor=1
>
> checkpoints.topic.replication.factor=1
> heartbeats.topic.replication.factor=1
> offset-syncs.topic.replication.factor=1
>
> offset.storage.replication.factor=1
> status.storage.replication.factor=1
> config.storage.replication.factor=1
>
> sync.topic.acls.enabled = false
>
> emit.checkpoints.enabled = true
> emit.checkpoints.interval.seconds = 5
> ----
>
> Need help on this urgently. Thanks in advance.
>
> Thanks & Regards,
> Sunny Kumar Lohani,
>