You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "victor (Jira)" <ji...@apache.org> on 2020/05/13 10:22:00 UTC
[jira] [Commented] (KAFKA-9459) MM2 sync topic config does not work
[ https://issues.apache.org/jira/browse/KAFKA-9459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106182#comment-17106182 ]
victor commented on KAFKA-9459:
-------------------------------
https://issues.apache.org/jira/browse/KAFKA-9981
> MM2 sync topic config does not work
> -----------------------------------
>
> Key: KAFKA-9459
> URL: https://issues.apache.org/jira/browse/KAFKA-9459
> Project: Kafka
> Issue Type: Bug
> Components: mirrormaker
> Affects Versions: 2.4.0
> Reporter: Badai Aqrandista
> Priority: Major
>
> I have MM2 configured as follow:
> {code:java}
> {
> "name": "mm2-from-1-to-2",
> "config": {
> "connector.class":"org.apache.kafka.connect.mirror.MirrorSourceConnector",
> "topics":"foo",
> "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
> "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
> "sync.topic.configs.enabled":"true",
> "sync.topic.configs.interval.seconds": 60,
> "sync.topic.acls.enabled": "false",
> "replication.factor": 1,
> "offset-syncs.topic.replication.factor": 1,
> "heartbeats.topic.replication.factor": 1,
> "checkpoints.topic.replication.factor": 1,
> "target.cluster.alias":"dest",
> "target.cluster.bootstrap.servers":"dest.example.com:9092",
> "source.cluster.alias":"src",
> "source.cluster.bootstrap.servers":"src.example.com:9092",
> "tasks.max": 1}
> }
> {code}
> Topic "foo" is configured with "cleanup.policy=compact". But after waiting for 15 minutes, I still don't see "src.foo" in the destination cluster has "cleanup.policy=compact".
> I had the connect node to run in TRACE level and I could not find any calls to describeConfigs (https://github.com/apache/kafka/blob/2.4.0/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L327). This implies it never actually get a list of topics that it needs to get topic configs from.
> And I am suspecting this code always return empty Set (https://github.com/apache/kafka/blob/2.4.0/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L214-L220):
> {code:java}
> private Set<String> topicsBeingReplicated() {
> return knownTopicPartitions.stream()
> .map(x -> x.topic())
> .distinct()
> .filter(x -> knownTargetTopics.contains(formatRemoteTopic(x)))
> .collect(Collectors.toSet());
> }
> {code}
> knownTopicPartitions contains topic-partitions from the source cluster.
> knownTargetTopics contains topic-partitions from the target cluster, whose topic names contain source alias already.
> So, why is topicsBeingReplicated (list of topic-partitions from source cluster) being filtered using knownTopicPartitions (list of topic-partitions from target cluster)?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)