You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Badai Aqrandista (Jira)" <ji...@apache.org> on 2020/01/21 12:36:00 UTC

[jira] [Created] (KAFKA-9459) MM2 sync topic config does work

Badai Aqrandista created KAFKA-9459:
---------------------------------------

             Summary: MM2 sync topic config does work
                 Key: KAFKA-9459
                 URL: https://issues.apache.org/jira/browse/KAFKA-9459
             Project: Kafka
          Issue Type: Bug
          Components: mirrormaker
    Affects Versions: 2.4.0
            Reporter: Badai Aqrandista


I have MM2 configured as follow:

{code:java}
{
        "name": "mm2-from-1-to-2",
        "config": {
          "connector.class":"org.apache.kafka.connect.mirror.MirrorSourceConnector",
          "topics":"foo",
          "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
          "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
          "sync.topic.configs.enabled":"true",
          "sync.topic.configs.interval.seconds": 60,
          "sync.topic.acls.enabled": "false",
          "replication.factor": 1,
          "offset-syncs.topic.replication.factor": 1,
          "heartbeats.topic.replication.factor": 1,
          "checkpoints.topic.replication.factor": 1,

          "target.cluster.alias":"dest",
          "target.cluster.bootstrap.servers":"dest.example.com:9092",

          "source.cluster.alias":"src",
          "source.cluster.bootstrap.servers":"src.example.com:9092",

          "tasks.max": 1}
}
{code}

Topic "foo" is configured with "cleanup.policy=compact". But after waiting for 15 minutes, I still don't see "src.foo" in the destination cluster has "cleanup.policy=compact".

I had the connect node to run in TRACE level and I could not find any calls to describeConfigs (https://github.com/apache/kafka/blob/2.4.0/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L327). This implies it never actually get a list of topics that it needs to get topic configs from.

And I am suspecting this code (https://github.com/apache/kafka/blob/2.4.0/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L214-L220):


{code:java}
    private Set<String> topicsBeingReplicated() {
        return knownTopicPartitions.stream()
            .map(x -> x.topic())
            .distinct()
            .filter(x -> knownTargetTopics.contains(formatRemoteTopic(x)))
            .collect(Collectors.toSet());
    }
{code}

knownTopicPartitions contains topic-partitions from the source cluster.
knownTargetTopics contains topic-partitions from the target cluster, whose topic names contain source alias already.

So, why is topicsBeingReplicated (list of topic-partitions from source cluster) being filtered using knownTopicPartitions (list of topic-partitions from target cluster)?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)