You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2021/10/11 14:37:49 UTC

[kafka] branch trunk updated: KAFKA-10777: Add additional configuration to control MirrorMaker 2 internal topics naming convention - KIP-690 (#11220)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 400d39b  KAFKA-10777: Add additional configuration to control MirrorMaker 2 internal topics naming convention - KIP-690 (#11220)
400d39b is described below

commit 400d39bb0efaa4fdda4c0597a63a1ddd17082291
Author: Omnia G H Ibrahim <o....@gmail.com>
AuthorDate: Mon Oct 11 15:35:12 2021 +0100

    KAFKA-10777: Add additional configuration to control MirrorMaker 2 internal topics naming convention - KIP-690 (#11220)
    
    
    Reviewers: Mickael Maison <mi...@gmail.com>
---
 .../connect/mirror/DefaultReplicationPolicy.java   | 28 ++++++++++++++++++
 .../connect/mirror/IdentityReplicationPolicy.java  |  2 +-
 .../apache/kafka/connect/mirror/MirrorClient.java  | 12 ++++----
 .../kafka/connect/mirror/MirrorClientConfig.java   |  3 --
 .../kafka/connect/mirror/ReplicationPolicy.java    | 34 ++++++++++++++++++++--
 .../connect/mirror/MirrorConnectorConfig.java      |  9 ++----
 .../connect/mirror/MirrorSourceConnector.java      |  7 +----
 7 files changed, 70 insertions(+), 25 deletions(-)

diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java
index 30d7534..9de50b6 100644
--- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java
@@ -70,4 +70,32 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable
             return topic.substring(source.length() + separator.length());
         }
     }
+
+    private String internalSuffix() {
+        return separator + "internal";
+    }
+
+    private String checkpointsTopicSuffix() {
+        return separator + "checkpoints" + internalSuffix();
+    }
+
+    @Override
+    public String offsetSyncsTopic(String clusterAlias) {
+        return "mm2-offset-syncs" + separator + clusterAlias + internalSuffix();
+    }
+
+    @Override
+    public String checkpointsTopic(String clusterAlias) {
+        return clusterAlias + checkpointsTopicSuffix();
+    }
+
+    @Override
+    public boolean isCheckpointsTopic(String topic) {
+        return  topic.endsWith(checkpointsTopicSuffix());
+    }
+
+    @Override
+    public boolean isMM2InternalTopic(String topic) {
+        return  topic.endsWith(internalSuffix());
+    }
 }
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java
index 7320b18..1f0df63 100644
--- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java
@@ -87,6 +87,6 @@ public class IdentityReplicationPolicy extends DefaultReplicationPolicy {
     }
 
     private boolean looksLikeHeartbeat(String topic) {
-        return topic != null && topic.endsWith(MirrorClientConfig.HEARTBEATS_TOPIC);
+        return topic != null && topic.endsWith(heartbeatsTopic());
     }
 }
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
index aa0bca1..28790c1 100644
--- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
@@ -30,7 +30,7 @@ import org.apache.kafka.common.protocol.types.SchemaException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
- 
+
 import java.time.Duration;
 import java.util.Set;
 import java.util.HashSet;
@@ -156,7 +156,7 @@ public class MirrorClient implements AutoCloseable {
         try {
             // checkpoint topics are not "remote topics", as they are not replicated. So we don't need
             // to use ReplicationPolicy to create the checkpoint topic here.
-            String checkpointTopic = remoteClusterAlias + MirrorClientConfig.CHECKPOINTS_TOPIC_SUFFIX;
+            String checkpointTopic = replicationPolicy.checkpointsTopic(remoteClusterAlias);
             List<TopicPartition> checkpointAssignment =
                 Collections.singletonList(new TopicPartition(checkpointTopic, 0));
             consumer.assign(checkpointAssignment);
@@ -209,17 +209,15 @@ public class MirrorClient implements AutoCloseable {
             }
             visited.add(source);
             topic = replicationPolicy.upstreamTopic(topic);
-        } 
+        }
     }
 
     boolean isHeartbeatTopic(String topic) {
-        // heartbeats are replicated, so we must use ReplicationPolicy here
-        return MirrorClientConfig.HEARTBEATS_TOPIC.equals(replicationPolicy.originalTopic(topic));
+        return replicationPolicy.isHeartbeatsTopic(topic);
     }
 
     boolean isCheckpointTopic(String topic) {
-        // checkpoints are not replicated, so we don't need to use ReplicationPolicy here
-        return topic.endsWith(MirrorClientConfig.CHECKPOINTS_TOPIC_SUFFIX);
+        return replicationPolicy.isCheckpointsTopic(topic);
     }
 
     boolean isRemoteTopic(String topic) {
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
index 9292198..4305366 100644
--- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
@@ -57,9 +57,6 @@ public class MirrorClientConfig extends AbstractConfig {
     public static final String CONSUMER_CLIENT_PREFIX = "consumer.";
     public static final String PRODUCER_CLIENT_PREFIX = "producer.";
 
-    static final String CHECKPOINTS_TOPIC_SUFFIX = ".checkpoints.internal"; // internal so not replicated
-    static final String HEARTBEATS_TOPIC = "heartbeats";
- 
     MirrorClientConfig(Map<?, ?> props) {
         super(CONFIG_DEF, props, true);
     }
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
index b6eb26c..0a9130b 100644
--- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
@@ -52,9 +52,39 @@ public interface ReplicationPolicy {
         }
     }
 
+    /** Returns heartbeats topic name.*/
+    default String heartbeatsTopic() {
+        return "heartbeats";
+    }
+
+    /** Returns the offset-syncs topic for given cluster alias. */
+    default String offsetSyncsTopic(String clusterAlias) {
+        return "mm2-offset-syncs." + clusterAlias + ".internal";
+    }
+
+    /** Returns the name checkpoint topic for given cluster alias. */
+    default String checkpointsTopic(String clusterAlias) {
+        return clusterAlias + ".checkpoints.internal";
+    }
+
+    /** check if topic is a heartbeat topic, e.g heartbeats, us-west.heartbeats. */
+    default boolean isHeartbeatsTopic(String topic) {
+        return heartbeatsTopic().equals(originalTopic(topic));
+    }
+
+    /** check if topic is a checkpoint topic. */
+    default boolean isCheckpointsTopic(String topic) {
+        return  topic.endsWith(".checkpoints.internal");
+    }
+
+    /** Check topic is one of MM2 internal topic, this is used to make sure the topic doesn't need to be replicated.*/
+    default boolean isMM2InternalTopic(String topic) {
+        return  topic.endsWith(".internal");
+    }
+
     /** Internal topics are never replicated. */
     default boolean isInternalTopic(String topic) {
-        return topic.endsWith(".internal") || topic.endsWith("-internal") || topic.startsWith("__")
-            || topic.startsWith(".");
+        boolean isKafkaInternalTopic = topic.startsWith("__") || topic.startsWith(".") ||  topic.endsWith("-internal");
+        return isMM2InternalTopic(topic) || isKafkaInternalTopic;
     }
 }
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
index da1cb27..c964ab0 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
@@ -343,8 +343,7 @@ public class MirrorConnectorConfig extends AbstractConfig {
         String otherClusterAlias = SOURCE_CLUSTER_ALIAS_DEFAULT.equals(offsetSyncsTopicLocation())
                 ? targetClusterAlias()
                 : sourceClusterAlias();
-        // ".internal" suffix ensures this doesn't get replicated
-        return "mm2-offset-syncs." + otherClusterAlias + ".internal";
+        return replicationPolicy().offsetSyncsTopic(otherClusterAlias);
     }
 
     String offsetSyncsTopicLocation() {
@@ -370,7 +369,7 @@ public class MirrorConnectorConfig extends AbstractConfig {
     }
 
     String heartbeatsTopic() {
-        return MirrorClientConfig.HEARTBEATS_TOPIC;
+        return replicationPolicy().heartbeatsTopic();
     }
 
     // e.g. source1.heartbeats
@@ -379,9 +378,7 @@ public class MirrorConnectorConfig extends AbstractConfig {
     }
 
     String checkpointsTopic() {
-        // Checkpoint topics are not "remote topics", as they are not replicated, so we don't
-        // need to use ReplicationPolicy here.
-        return sourceClusterAlias() + MirrorClientConfig.CHECKPOINTS_TOPIC_SUFFIX;
+        return replicationPolicy().checkpointsTopic(sourceClusterAlias());
     }
 
     long maxOffsetLag() {
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
index 0e3ccc9..b2cbe02 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -471,7 +471,7 @@ public class MirrorSourceConnector extends SourceConnector {
     }
 
     boolean shouldReplicateTopic(String topic) {
-        return (topicFilter.shouldReplicateTopic(topic) || isHeartbeatTopic(topic))
+        return (topicFilter.shouldReplicateTopic(topic) || replicationPolicy.isHeartbeatsTopic(topic))
             && !replicationPolicy.isInternalTopic(topic) && !isCycle(topic);
     }
 
@@ -501,11 +501,6 @@ public class MirrorSourceConnector extends SourceConnector {
         }
     }
 
-    // e.g. heartbeats, us-west.heartbeats
-    boolean isHeartbeatTopic(String topic) {
-        return MirrorClientConfig.HEARTBEATS_TOPIC.equals(replicationPolicy.originalTopic(topic));
-    }
-
     String formatRemoteTopic(String topic) {
         return replicationPolicy.formatRemoteTopic(sourceAndTarget.source(), topic);
     }