You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Ananya (Jira)" <ji...@apache.org> on 2020/10/14 07:23:00 UTC

[jira] [Updated] (KAFKA-10609) Mirror Maker 2.0 RemoteClusterUtils do not return offset map for Assign Mode Consumers

     [ https://issues.apache.org/jira/browse/KAFKA-10609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ananya updated KAFKA-10609:
---------------------------
    Description: 
I was using a mirror maker 2.0. I was testing the consumer checkpointing functionality. I found that the RemoteClusterUtils.translateOffsets do not give checkpoints for the consumer which runs in assign mode.

+*Setup Details*+

I am using mirror maker 2.0 of Kafka Version 2.5.0 and Scala version 2.12
 My source Kafka setup is 1 broker 1 zookeeper having Kafka version 2.5.0. Scala version 2.12
 My target Kafka setup is 1 broker 1 zookeeper having Kafka version 2.5.0. Scala version 2.12

I am only doing 1-way replication from my source cluster to the target cluster.

+*Mirror Maker Config:*+

 
{code:java}
clusters = A, B
A.bootstrap.servers = localhost:9082
B.bootstrap.servers = localhost:9092

A->B.enabled = true
A->B.topics = .*
A->B.groups = .*

B->A.enabled = false
B->A.topics = .*

replication.factor=1

checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1

offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1

emit.heartbeats.interval.seconds = 2
refresh.topics.interval.seconds=1
refresh.groups.interval.seconds=1
emit.checkpoints.interval.seconds=1
sync.topic.configs.enabled=true
sync.topic.configs.interval.seconds=1{code}
 

+*Steps to replicate:*+
 * Create a topic on the source cluster
 * Push some data in the topic using console producer
 * Start a consumer in assign mode to read from the above topic but only from 1 partition.

 
{code:java}
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9082");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        ByteArrayDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ByteArrayDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "7");

KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(properties);

TopicPartition tp = new TopicPartition("testTopic", 1);
consumer.assign(Collections.singleton(tp));

while (true) {
      ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500));
      for (ConsumerRecord<byte[], byte[]> record : records) {
        System.out.println(new String(record.value()) + "__" + record.partition());
      }
    }
  }{code}
 * Stop consumer mid-way. Describe the consumer in the source cluster to get the lag information.

{code:java}
bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9082 --group testTopic
GROUP           TOPIC    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             
testGroup  testTopic      0          5               28              23           {code}
 * Run translate offset method to print the downstream offset.

 
{code:java}
Map<TopicPartition, OffsetAndMetadata> newOffsets =
        RemoteClusterUtils.translateOffsets(properties, "A", "testGroup", Duration.ofMillis(5500));
    System.out.println(newOffsets.toString());{code}
 * *{color:#ff0000}An empty map is returned{color}*

*+Expected Outcome:+ {color:#00875a}Translated Committed offset should have been returned.{color}*

+*My Debugging*+

On debugging the issue, I found that the checkpoint topic in the target cluster did not have this group's committed offset.

Tried multiple times with different commit frequency and topic/group name. It didn't work. Only consumer running in subscribe mode and console consumer with --group flag is giving checkpoint.

 

  was:
I was using a mirror maker 2.0. I was testing the consumer checkpointing functionality. I found that the RemoteClusterUtils.translateOffsets do not give checkpoints for the consumer which runs in assign mode.

I am using mirror maker 2.0 of Kafka Version 2.5.0 and Scala version 2.12
My source Kafka setup is 1 broker 1 zookeeper having Kafka version 2.5.0. Scala version 2.12
My target Kafka setup is 1 broker 1 zookeeper having Kafka version 2.5.0. Scala version 2.12

I am only doing 1-way replication from my source cluster to the target cluster.

Mirror Maker Config:
================

 
{code:java}
clusters = A, B
A.bootstrap.servers = localhost:9082
B.bootstrap.servers = localhost:9092

A->B.enabled = true
A->B.topics = .*
A->B.groups = .*

B->A.enabled = false
B->A.topics = .*

replication.factor=1

checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1

offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1

emit.heartbeats.interval.seconds = 2
refresh.topics.interval.seconds=1
refresh.groups.interval.seconds=1
emit.checkpoints.interval.seconds=1
sync.topic.configs.enabled=true
sync.topic.configs.interval.seconds=1{code}
 

+*Steps to replicate:*+
 * Create a topic on the source cluster
 * Push some data in the topic using console producer
 * Start a consumer in assign mode to read from the above topic but only from 1 partition.

 
{code:java}
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9082");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        ByteArrayDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ByteArrayDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "7");

KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(properties);

TopicPartition tp = new TopicPartition("testTopic", 1);
consumer.assign(Collections.singleton(tp));

while (true) {
      ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500));
      for (ConsumerRecord<byte[], byte[]> record : records) {
        System.out.println(new String(record.value()) + "__" + record.partition());
      }
    }
  }{code}
 * Stop consumer mid-way. Describe the consumer in the source cluster to get the lag information.

{code:java}
bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9082 --group testTopic
GROUP           TOPIC    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             
testGroup  testTopic      0          5               28              23           {code}
 * Run translate offset method to print the downstream offset.

 
{code:java}
Map<TopicPartition, OffsetAndMetadata> newOffsets =
        RemoteClusterUtils.translateOffsets(properties, "A", "testGroup", Duration.ofMillis(5500));
    System.out.println(newOffsets.toString());{code}
 * *{color:#FF0000}An empty map is returned{color}*



*+Expected Outcome:+ {color:#00875a}Translated Committed offset should have been returned.{color}*

+*My Debugging*+

On debugging the issue, I found that the checkpoint topic in the target cluster did not have this group's committed offset.

Tried multiple times with different commit frequency and topic/group name. It didn't work. Only consumer running in subscribe mode and console consumer with --group flag is giving checkpoint.



 


> Mirror Maker 2.0 RemoteClusterUtils do not return offset map for Assign Mode Consumers
> --------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10609
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10609
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 2.5.0
>         Environment: Ubuntu 19 8 core 16GB machine
>            Reporter: Ananya
>            Priority: Major
>
> I was using a mirror maker 2.0. I was testing the consumer checkpointing functionality. I found that the RemoteClusterUtils.translateOffsets do not give checkpoints for the consumer which runs in assign mode.
> +*Setup Details*+
> I am using mirror maker 2.0 of Kafka Version 2.5.0 and Scala version 2.12
>  My source Kafka setup is 1 broker 1 zookeeper having Kafka version 2.5.0. Scala version 2.12
>  My target Kafka setup is 1 broker 1 zookeeper having Kafka version 2.5.0. Scala version 2.12
> I am only doing 1-way replication from my source cluster to the target cluster.
> +*Mirror Maker Config:*+
>  
> {code:java}
> clusters = A, B
> A.bootstrap.servers = localhost:9082
> B.bootstrap.servers = localhost:9092
> A->B.enabled = true
> A->B.topics = .*
> A->B.groups = .*
> B->A.enabled = false
> B->A.topics = .*
> replication.factor=1
> checkpoints.topic.replication.factor=1
> heartbeats.topic.replication.factor=1
> offset-syncs.topic.replication.factor=1
> offset.storage.replication.factor=1
> status.storage.replication.factor=1
> config.storage.replication.factor=1
> emit.heartbeats.interval.seconds = 2
> refresh.topics.interval.seconds=1
> refresh.groups.interval.seconds=1
> emit.checkpoints.interval.seconds=1
> sync.topic.configs.enabled=true
> sync.topic.configs.interval.seconds=1{code}
>  
> +*Steps to replicate:*+
>  * Create a topic on the source cluster
>  * Push some data in the topic using console producer
>  * Start a consumer in assign mode to read from the above topic but only from 1 partition.
>  
> {code:java}
> Properties properties = new Properties();
> properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9082");
> properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
>         ByteArrayDeserializer.class.getName());
> properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ByteArrayDeserializer.class.getName());
> properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
> properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "7");
> KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(properties);
> TopicPartition tp = new TopicPartition("testTopic", 1);
> consumer.assign(Collections.singleton(tp));
> while (true) {
>       ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500));
>       for (ConsumerRecord<byte[], byte[]> record : records) {
>         System.out.println(new String(record.value()) + "__" + record.partition());
>       }
>     }
>   }{code}
>  * Stop consumer mid-way. Describe the consumer in the source cluster to get the lag information.
> {code:java}
> bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9082 --group testTopic
> GROUP           TOPIC    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             
> testGroup  testTopic      0          5               28              23           {code}
>  * Run translate offset method to print the downstream offset.
>  
> {code:java}
> Map<TopicPartition, OffsetAndMetadata> newOffsets =
>         RemoteClusterUtils.translateOffsets(properties, "A", "testGroup", Duration.ofMillis(5500));
>     System.out.println(newOffsets.toString());{code}
>  * *{color:#ff0000}An empty map is returned{color}*
> *+Expected Outcome:+ {color:#00875a}Translated Committed offset should have been returned.{color}*
> +*My Debugging*+
> On debugging the issue, I found that the checkpoint topic in the target cluster did not have this group's committed offset.
> Tried multiple times with different commit frequency and topic/group name. It didn't work. Only consumer running in subscribe mode and console consumer with --group flag is giving checkpoint.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)