You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2021/10/11 14:37:49 UTC
[kafka] branch trunk updated: KAFKA-10777: Add additional
configuration to control MirrorMaker 2 internal topics naming convention -
KIP-690 (#11220)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 400d39b KAFKA-10777: Add additional configuration to control MirrorMaker 2 internal topics naming convention - KIP-690 (#11220)
400d39b is described below
commit 400d39bb0efaa4fdda4c0597a63a1ddd17082291
Author: Omnia G H Ibrahim <o....@gmail.com>
AuthorDate: Mon Oct 11 15:35:12 2021 +0100
KAFKA-10777: Add additional configuration to control MirrorMaker 2 internal topics naming convention - KIP-690 (#11220)
Reviewers: Mickael Maison <mi...@gmail.com>
---
.../connect/mirror/DefaultReplicationPolicy.java | 28 ++++++++++++++++++
.../connect/mirror/IdentityReplicationPolicy.java | 2 +-
.../apache/kafka/connect/mirror/MirrorClient.java | 12 ++++----
.../kafka/connect/mirror/MirrorClientConfig.java | 3 --
.../kafka/connect/mirror/ReplicationPolicy.java | 34 ++++++++++++++++++++--
.../connect/mirror/MirrorConnectorConfig.java | 9 ++----
.../connect/mirror/MirrorSourceConnector.java | 7 +----
7 files changed, 70 insertions(+), 25 deletions(-)
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java
index 30d7534..9de50b6 100644
--- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java
@@ -70,4 +70,32 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable
return topic.substring(source.length() + separator.length());
}
}
+
+ private String internalSuffix() {
+ return separator + "internal";
+ }
+
+ private String checkpointsTopicSuffix() {
+ return separator + "checkpoints" + internalSuffix();
+ }
+
+ @Override
+ public String offsetSyncsTopic(String clusterAlias) {
+ return "mm2-offset-syncs" + separator + clusterAlias + internalSuffix();
+ }
+
+ @Override
+ public String checkpointsTopic(String clusterAlias) {
+ return clusterAlias + checkpointsTopicSuffix();
+ }
+
+ @Override
+ public boolean isCheckpointsTopic(String topic) {
+ return topic.endsWith(checkpointsTopicSuffix());
+ }
+
+ @Override
+ public boolean isMM2InternalTopic(String topic) {
+ return topic.endsWith(internalSuffix());
+ }
}
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java
index 7320b18..1f0df63 100644
--- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java
@@ -87,6 +87,6 @@ public class IdentityReplicationPolicy extends DefaultReplicationPolicy {
}
private boolean looksLikeHeartbeat(String topic) {
- return topic != null && topic.endsWith(MirrorClientConfig.HEARTBEATS_TOPIC);
+ return topic != null && topic.endsWith(heartbeatsTopic());
}
}
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
index aa0bca1..28790c1 100644
--- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
@@ -30,7 +30,7 @@ import org.apache.kafka.common.protocol.types.SchemaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
+
import java.time.Duration;
import java.util.Set;
import java.util.HashSet;
@@ -156,7 +156,7 @@ public class MirrorClient implements AutoCloseable {
try {
// checkpoint topics are not "remote topics", as they are not replicated. So we don't need
// to use ReplicationPolicy to create the checkpoint topic here.
- String checkpointTopic = remoteClusterAlias + MirrorClientConfig.CHECKPOINTS_TOPIC_SUFFIX;
+ String checkpointTopic = replicationPolicy.checkpointsTopic(remoteClusterAlias);
List<TopicPartition> checkpointAssignment =
Collections.singletonList(new TopicPartition(checkpointTopic, 0));
consumer.assign(checkpointAssignment);
@@ -209,17 +209,15 @@ public class MirrorClient implements AutoCloseable {
}
visited.add(source);
topic = replicationPolicy.upstreamTopic(topic);
- }
+ }
}
boolean isHeartbeatTopic(String topic) {
- // heartbeats are replicated, so we must use ReplicationPolicy here
- return MirrorClientConfig.HEARTBEATS_TOPIC.equals(replicationPolicy.originalTopic(topic));
+ return replicationPolicy.isHeartbeatsTopic(topic);
}
boolean isCheckpointTopic(String topic) {
- // checkpoints are not replicated, so we don't need to use ReplicationPolicy here
- return topic.endsWith(MirrorClientConfig.CHECKPOINTS_TOPIC_SUFFIX);
+ return replicationPolicy.isCheckpointsTopic(topic);
}
boolean isRemoteTopic(String topic) {
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
index 9292198..4305366 100644
--- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
@@ -57,9 +57,6 @@ public class MirrorClientConfig extends AbstractConfig {
public static final String CONSUMER_CLIENT_PREFIX = "consumer.";
public static final String PRODUCER_CLIENT_PREFIX = "producer.";
- static final String CHECKPOINTS_TOPIC_SUFFIX = ".checkpoints.internal"; // internal so not replicated
- static final String HEARTBEATS_TOPIC = "heartbeats";
-
MirrorClientConfig(Map<?, ?> props) {
super(CONFIG_DEF, props, true);
}
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
index b6eb26c..0a9130b 100644
--- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
@@ -52,9 +52,39 @@ public interface ReplicationPolicy {
}
}
+ /** Returns heartbeats topic name.*/
+ default String heartbeatsTopic() {
+ return "heartbeats";
+ }
+
+ /** Returns the offset-syncs topic for given cluster alias. */
+ default String offsetSyncsTopic(String clusterAlias) {
+ return "mm2-offset-syncs." + clusterAlias + ".internal";
+ }
+
+ /** Returns the name checkpoint topic for given cluster alias. */
+ default String checkpointsTopic(String clusterAlias) {
+ return clusterAlias + ".checkpoints.internal";
+ }
+
+ /** check if topic is a heartbeat topic, e.g heartbeats, us-west.heartbeats. */
+ default boolean isHeartbeatsTopic(String topic) {
+ return heartbeatsTopic().equals(originalTopic(topic));
+ }
+
+ /** check if topic is a checkpoint topic. */
+ default boolean isCheckpointsTopic(String topic) {
+ return topic.endsWith(".checkpoints.internal");
+ }
+
+ /** Check topic is one of MM2 internal topic, this is used to make sure the topic doesn't need to be replicated.*/
+ default boolean isMM2InternalTopic(String topic) {
+ return topic.endsWith(".internal");
+ }
+
/** Internal topics are never replicated. */
default boolean isInternalTopic(String topic) {
- return topic.endsWith(".internal") || topic.endsWith("-internal") || topic.startsWith("__")
- || topic.startsWith(".");
+ boolean isKafkaInternalTopic = topic.startsWith("__") || topic.startsWith(".") || topic.endsWith("-internal");
+ return isMM2InternalTopic(topic) || isKafkaInternalTopic;
}
}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
index da1cb27..c964ab0 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
@@ -343,8 +343,7 @@ public class MirrorConnectorConfig extends AbstractConfig {
String otherClusterAlias = SOURCE_CLUSTER_ALIAS_DEFAULT.equals(offsetSyncsTopicLocation())
? targetClusterAlias()
: sourceClusterAlias();
- // ".internal" suffix ensures this doesn't get replicated
- return "mm2-offset-syncs." + otherClusterAlias + ".internal";
+ return replicationPolicy().offsetSyncsTopic(otherClusterAlias);
}
String offsetSyncsTopicLocation() {
@@ -370,7 +369,7 @@ public class MirrorConnectorConfig extends AbstractConfig {
}
String heartbeatsTopic() {
- return MirrorClientConfig.HEARTBEATS_TOPIC;
+ return replicationPolicy().heartbeatsTopic();
}
// e.g. source1.heartbeats
@@ -379,9 +378,7 @@ public class MirrorConnectorConfig extends AbstractConfig {
}
String checkpointsTopic() {
- // Checkpoint topics are not "remote topics", as they are not replicated, so we don't
- // need to use ReplicationPolicy here.
- return sourceClusterAlias() + MirrorClientConfig.CHECKPOINTS_TOPIC_SUFFIX;
+ return replicationPolicy().checkpointsTopic(sourceClusterAlias());
}
long maxOffsetLag() {
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
index 0e3ccc9..b2cbe02 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -471,7 +471,7 @@ public class MirrorSourceConnector extends SourceConnector {
}
boolean shouldReplicateTopic(String topic) {
- return (topicFilter.shouldReplicateTopic(topic) || isHeartbeatTopic(topic))
+ return (topicFilter.shouldReplicateTopic(topic) || replicationPolicy.isHeartbeatsTopic(topic))
&& !replicationPolicy.isInternalTopic(topic) && !isCycle(topic);
}
@@ -501,11 +501,6 @@ public class MirrorSourceConnector extends SourceConnector {
}
}
- // e.g. heartbeats, us-west.heartbeats
- boolean isHeartbeatTopic(String topic) {
- return MirrorClientConfig.HEARTBEATS_TOPIC.equals(replicationPolicy.originalTopic(topic));
- }
-
String formatRemoteTopic(String topic) {
return replicationPolicy.formatRemoteTopic(sourceAndTarget.source(), topic);
}