You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Carlo Bongiovanni (Jira)" <ji...@apache.org> on 2020/11/04 17:21:00 UTC

[jira] [Created] (KAFKA-10681) MM2 translateOffsets returns wrong offsets

Carlo Bongiovanni created KAFKA-10681:
-----------------------------------------

             Summary: MM2 translateOffsets returns wrong offsets
                 Key: KAFKA-10681
                 URL: https://issues.apache.org/jira/browse/KAFKA-10681
             Project: Kafka
          Issue Type: Bug
          Components: mirrormaker
    Affects Versions: 2.5.0
         Environment: GKE, strimzi release
            Reporter: Carlo Bongiovanni


Hi all,

we'd like to make use of the ability of MM2 to mirror checkpoints of consumer offsets, in order to have a graceful failover from an active cluster to a standby one.

For this reason we have created the following setup (FYI all done with strimzi on k8s):
 * an active kafka cluster 2.5.0 used by a few producers/consumers
 * a standby kafka cluster 2.5.0
 * MM2 is setup in one direction only to mirror from active to standby

We have let MM2 run for some time and we could verify that messages are effectively mirrored.

At this point we have started developing the tooling to create consumer groups in the consumer-offsets topic of the passive cluster, by reading the internal checkpoints topic.

The following is an extract of our code to read the translated offsets:


{code:java}
Map<String, Object> mm2Props = new HashMap<>();
 mm2Props.put(BOOTSTRAP_SERVERS_CONFIG, "bootstrap_servers");
 mm2Props.put("source.cluster.alias", "euwe");
 mm2Props.put(SASL_MECHANISM, "SCRAM-SHA-512");
 mm2Props.put(SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"user\" password=\"password\";");
 mm2Props.put(SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
 mm2Props.put(SSL_TRUSTSTORE_LOCATION_CONFIG, "/usr/local/lib/jdk/lib/security/cacerts");
 mm2Props.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, "some-password");
Map<TopicPartition, OffsetAndMetadata> translatedOffsets = RemoteClusterUtils
 .translateOffsets(mm2Props, (String) mm2Props.get("source.cluster.alias"), cgi,
 Duration.ofSeconds(60L));
{code}
 

Before persisting the translated offsets with 
{code:java}
AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = kafkaClient
 .alterConsumerGroupOffsets(cgi, offsets);{code}
we filter them because we don't want to create consumer groups for all retrieved offsets.

During the filtering, we compare the values of the translated offset for each topic partition (as coming from the checkpoint topic), 
 with the respective current offset value for each topic partition (as mirrored from MM2).

While running this check we have verified that for some topics we get big difference between those values, while for other topics the update seems realistic.

For example, looking at a given target partition we see it has an offset of 100 (after mirroring by mm2). 
 From the checkpoint topic for the same consumer group id, we receive offset 200, and later 150.

The issues are that:
 * both consumer group id offsets exceed the real offset of the partition
 * the consumer group id offsets from checkpoint goes down over time, not up

We haven't been able to explain it, the wrong numbers are coming from the *RemoteClusterUtils.translateOffsets()* and we're wondering if this could be a misconfiguration on our side or a bug of MM2.

Thanks, best
 C.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)