You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/10 16:58:19 UTC

[GitHub] [kafka] ivanyu commented on a change in pull request #9395: KAFKA-9726: Add LegacyReplicationPolicy for MM2

ivanyu commented on a change in pull request #9395:
URL: https://github.com/apache/kafka/pull/9395#discussion_r502810124



##########
File path: connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/LegacyReplicationPolicy.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.Configurable;
+
+import java.util.Map;
+
+import static org.apache.kafka.connect.mirror.MirrorClientConfig.HEARTBEATS_TOPIC;
+
+/**
+ * The replication policy that imitates the behavior of MirrorMaker 1.
+ *
+ * <p>The policy doesn't rename topics: {@code topic1} remains {@code topic1} after replication.
+ * There is one exception to this: for {@code heartbeats}, it behaves identical to {@link DefaultReplicationPolicy}.
+ *
+ * <p>The policy has some notable limitations. The most important one is that the policy is unable to detect
+ * cycles for any topic apart from {@code heartbeats}. This makes cross-replication effectively impossible.
+ *
+ * <p>Another limitation is that {@link MirrorClient#remoteTopics()} will be able to list only
+ * {@code heartbeats} topics.
+ *
+ * <p>{@link MirrorClient#countHopsForTopic(String, String)} will return {@code -1} for any topic
+ * apart from {@code heartbeats}.
+ *
+ * <p>The policy supports {@link DefaultReplicationPolicy}'s configurations
+ * for the behavior related to {@code heartbeats}.
+ */
+public class LegacyReplicationPolicy implements ReplicationPolicy, Configurable {
+    // Replication sub-policy for heartbeats topics
+    private final DefaultReplicationPolicy heartbeatTopicReplicationPolicy = new DefaultReplicationPolicy();
+
+    @Override
+    public void configure(final Map<String, ?> props) {
+        heartbeatTopicReplicationPolicy.configure(props);
+    }
+
+    @Override
+    public String formatRemoteTopic(final String sourceClusterAlias, final String topic) {
+        if (isOriginalTopicHeartbeats(topic)) {
+            return heartbeatTopicReplicationPolicy.formatRemoteTopic(sourceClusterAlias, topic);
+        } else {
+            return topic;
+        }
+    }
+
+    @Override
+    public String topicSource(final String topic) {
+        if (isOriginalTopicHeartbeats(topic)) {
+            return heartbeatTopicReplicationPolicy.topicSource(topic);
+        } else {
+            return null;

Review comment:
       I've explored this possibility, too. The main problem with it is that the replication policy should answer differently for source and target clusters. It's essential for methods like `MirrorSourceConnector.isCycle` and `MirrorClient.remoteTopics`. For a source, `topicSource` should return `null`; for a target, a predefined value.
   
   It leaves two possibility. In one, we set up two different replication policy instances with different configurations, e.g.:
   ```
   replication.policy.source.class=org.apache.kafka.connect.mirror.LegacyReplicationPolicy
   replication.policy.source.source=
   replication.policy.target.class=org.apache.kafka.connect.mirror.LegacyReplicationPolicy
   replication.policy.target.source=primary-cluster
   ```
   
   Of course, we can make that the current configurations work as before.
   
   Another possibility is to modify the `ReplicationPolicy` interface to allow it to pass additional information out (like `canTrackSource` or similar) or in (like `topicSource(String topic, boolean isSourceCluster)`).
   
   What do you think would be the best approach?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org