You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/10/25 12:29:58 UTC

[GitHub] [kafka] mimaison commented on a change in pull request #11431: KAFKA-13397: Honor 'replication.policy.separator' configuration when creating MirrorMaker2 internal topics

mimaison commented on a change in pull request #11431:
URL: https://github.com/apache/kafka/pull/11431#discussion_r735543104



##########
File path: connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
##########
@@ -50,8 +50,7 @@
     public static final Class<?> REPLICATION_POLICY_CLASS_DEFAULT = DefaultReplicationPolicy.class;
     public static final String REPLICATION_POLICY_SEPARATOR = "replication.policy.separator";
     private static final String REPLICATION_POLICY_SEPARATOR_DOC = "Separator used in remote topic naming convention.";
-    public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT =
-        DefaultReplicationPolicy.SEPARATOR_DEFAULT;
+    public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT = ".";

Review comment:
       Isn't it better to keep the separator in `DefaultReplicationPolicy`?

##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##########
@@ -183,12 +187,18 @@ public MirrorClientConfig clientConfig(String cluster) {
 
         // fill in reasonable defaults
         props.putIfAbsent(GROUP_ID_CONFIG, sourceAndTarget.source() + "-mm2");
-        props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "mm2-offsets."
-                + sourceAndTarget.source() + ".internal");
-        props.putIfAbsent(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "mm2-status."
-                + sourceAndTarget.source() + ".internal");
-        props.putIfAbsent(DistributedConfig.CONFIG_TOPIC_CONFIG, "mm2-configs."
-                + sourceAndTarget.source() + ".internal");
+
+        String separator = originalsStrings().getOrDefault(REPLICATION_POLICY_SEPARATOR, REPLICATION_POLICY_SEPARATOR_DEFAULT);
+        if (separator.equals("-")) {

Review comment:
       We did not have this check before, why is it needed? Also checks here are only applied when running in "driver" mode.

##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##########
@@ -183,12 +187,18 @@ public MirrorClientConfig clientConfig(String cluster) {
 
         // fill in reasonable defaults
         props.putIfAbsent(GROUP_ID_CONFIG, sourceAndTarget.source() + "-mm2");
-        props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "mm2-offsets."
-                + sourceAndTarget.source() + ".internal");
-        props.putIfAbsent(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "mm2-status."
-                + sourceAndTarget.source() + ".internal");
-        props.putIfAbsent(DistributedConfig.CONFIG_TOPIC_CONFIG, "mm2-configs."
-                + sourceAndTarget.source() + ".internal");
+
+        String separator = originalsStrings().getOrDefault(REPLICATION_POLICY_SEPARATOR, REPLICATION_POLICY_SEPARATOR_DEFAULT);
+        if (separator.equals("-")) {
+            throw new ConfigException("You should not use a single dash as a " + REPLICATION_POLICY_SEPARATOR);
+        }
+
+        props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "mm2-offsets" + separator

Review comment:
       Should we actually try to use the configured `ReplicationPolicy` here instead of generating the topic names? What happens if you provide a custom policy in driver mode?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org