You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Badai Aqrandista (Jira)" <ji...@apache.org> on 2020/01/21 12:36:00 UTC
[jira] [Created] (KAFKA-9459) MM2 sync topic config does work
Badai Aqrandista created KAFKA-9459:
---------------------------------------
Summary: MM2 sync topic config does work
Key: KAFKA-9459
URL: https://issues.apache.org/jira/browse/KAFKA-9459
Project: Kafka
Issue Type: Bug
Components: mirrormaker
Affects Versions: 2.4.0
Reporter: Badai Aqrandista
I have MM2 configured as follow:
{code:java}
{
"name": "mm2-from-1-to-2",
"config": {
"connector.class":"org.apache.kafka.connect.mirror.MirrorSourceConnector",
"topics":"foo",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"sync.topic.configs.enabled":"true",
"sync.topic.configs.interval.seconds": 60,
"sync.topic.acls.enabled": "false",
"replication.factor": 1,
"offset-syncs.topic.replication.factor": 1,
"heartbeats.topic.replication.factor": 1,
"checkpoints.topic.replication.factor": 1,
"target.cluster.alias":"dest",
"target.cluster.bootstrap.servers":"dest.example.com:9092",
"source.cluster.alias":"src",
"source.cluster.bootstrap.servers":"src.example.com:9092",
"tasks.max": 1}
}
{code}
Topic "foo" is configured with "cleanup.policy=compact". But after waiting for 15 minutes, I still don't see "src.foo" in the destination cluster has "cleanup.policy=compact".
I had the connect node to run in TRACE level and I could not find any calls to describeConfigs (https://github.com/apache/kafka/blob/2.4.0/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L327). This implies it never actually get a list of topics that it needs to get topic configs from.
And I am suspecting this code (https://github.com/apache/kafka/blob/2.4.0/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L214-L220):
{code:java}
private Set<String> topicsBeingReplicated() {
return knownTopicPartitions.stream()
.map(x -> x.topic())
.distinct()
.filter(x -> knownTargetTopics.contains(formatRemoteTopic(x)))
.collect(Collectors.toSet());
}
{code}
knownTopicPartitions contains topic-partitions from the source cluster.
knownTargetTopics contains topic-partitions from the target cluster, whose topic names contain source alias already.
So, why is topicsBeingReplicated (list of topic-partitions from source cluster) being filtered using knownTopicPartitions (list of topic-partitions from target cluster)?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)