You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/26 05:20:58 UTC

[GitHub] [kafka] dhruvilshah3 opened a new pull request #10217: KAFKA-12254: Ensure MM2 creates topics with source topic configs

dhruvilshah3 opened a new pull request #10217:
URL: https://github.com/apache/kafka/pull/10217


   MM2 creates new topics on the destination cluster with default configurations. It has an async periodic task to refresh topic configurations from the source to destination. However, this opens up a window where the destination cluster has data produced to it with default configurations. In the worst case, this could case data loss if the destination cluster is created without the right retention or cleanup policy configurations, for example.
   
   This patch fixes the above issue by ensuring that the right configurations are supplied to the `AdminClient#createTopics` when MM2 creates topics on the destination cluster. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dhruvilshah3 commented on a change in pull request #10217: KAFKA-12254: Ensure MM2 creates topics with source topic configs

Posted by GitBox <gi...@apache.org>.
dhruvilshah3 commented on a change in pull request #10217:
URL: https://github.com/apache/kafka/pull/10217#discussion_r583448586



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
##########
@@ -309,18 +309,37 @@ private void createOffsetSyncsTopic() {
     // visible for testing
     void computeAndCreateTopicPartitions()
             throws InterruptedException, ExecutionException {
-        Map<String, Long> partitionCounts = knownSourceTopicPartitions.stream()
+        Map<String, Long> topicToParitionCount = knownSourceTopicPartitions.stream()
             .collect(Collectors.groupingBy(TopicPartition::topic, Collectors.counting())).entrySet().stream()
             .collect(Collectors.toMap(x -> formatRemoteTopic(x.getKey()), Entry::getValue));
         Set<String> knownTargetTopics = toTopics(knownTargetTopicPartitions);
-        List<NewTopic> newTopics = partitionCounts.entrySet().stream()
-            .filter(x -> !knownTargetTopics.contains(x.getKey()))
-            .map(x -> new NewTopic(x.getKey(), x.getValue().intValue(), (short) replicationFactor))
+
+        // get the set of topics that are not present on the target
+        Set<String> topicsToCreate = topicToParitionCount.keySet().stream()
+                .filter(topic -> !knownTargetTopics.contains(topic))
+                .collect(Collectors.toSet());
+        if (topicsToCreate.isEmpty())
+            return;
+
+        // get corresponding topic configurations from the source
+        Map<String, Config> configs = describeTopicConfigs(topicsToCreate);

Review comment:
       Addressed in db09806.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram merged pull request #10217: KAFKA-12254: Ensure MM2 creates topics with source topic configs

Posted by GitBox <gi...@apache.org>.
rajinisivaram merged pull request #10217:
URL: https://github.com/apache/kafka/pull/10217


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dhruvilshah3 commented on pull request #10217: KAFKA-12254: Ensure MM2 creates topics with source topic configs

Posted by GitBox <gi...@apache.org>.
dhruvilshah3 commented on pull request #10217:
URL: https://github.com/apache/kafka/pull/10217#issuecomment-786895408


   Thanks for the review @rajinisivaram. I ran `MirrorConnectorsIntegrationSSLTest` a few times locally and it passed. It passed in the latest jenkins run as well. Couple of failures in the latest run seem unrelated to the changes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dhruvilshah3 commented on pull request #10217: KAFKA-12254: Ensure MM2 creates topics with source topic configs

Posted by GitBox <gi...@apache.org>.
dhruvilshah3 commented on pull request #10217:
URL: https://github.com/apache/kafka/pull/10217#issuecomment-786418690


   cc @rajinisivaram @junrao for review


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dhruvilshah3 commented on a change in pull request #10217: KAFKA-12254: Ensure MM2 creates topics with source topic configs

Posted by GitBox <gi...@apache.org>.
dhruvilshah3 commented on a change in pull request #10217:
URL: https://github.com/apache/kafka/pull/10217#discussion_r583832959



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
##########
@@ -306,40 +307,84 @@ private void createOffsetSyncsTopic() {
         MirrorUtils.createSinglePartitionCompactedTopic(config.offsetSyncsTopic(), config.offsetSyncsTopicReplicationFactor(), config.sourceAdminConfig());
     }
 
-    // visible for testing
-    void computeAndCreateTopicPartitions()
-            throws InterruptedException, ExecutionException {
-        Map<String, Long> partitionCounts = knownSourceTopicPartitions.stream()
-            .collect(Collectors.groupingBy(TopicPartition::topic, Collectors.counting())).entrySet().stream()
-            .collect(Collectors.toMap(x -> formatRemoteTopic(x.getKey()), Entry::getValue));
-        Set<String> knownTargetTopics = toTopics(knownTargetTopicPartitions);
-        List<NewTopic> newTopics = partitionCounts.entrySet().stream()
-            .filter(x -> !knownTargetTopics.contains(x.getKey()))
-            .map(x -> new NewTopic(x.getKey(), x.getValue().intValue(), (short) replicationFactor))
-            .collect(Collectors.toList());
-        Map<String, NewPartitions> newPartitions = partitionCounts.entrySet().stream()
-            .filter(x -> knownTargetTopics.contains(x.getKey()))
-            .collect(Collectors.toMap(Entry::getKey, x -> NewPartitions.increaseTo(x.getValue().intValue())));
-        createTopicPartitions(partitionCounts, newTopics, newPartitions);
+    void computeAndCreateTopicPartitions() throws ExecutionException, InterruptedException {
+        // get source and target topics with respective partition counts
+        Map<String, Long> sourceTopicToPartitionCounts = knownSourceTopicPartitions.stream()
+                .collect(Collectors.groupingBy(TopicPartition::topic, Collectors.counting())).entrySet().stream()
+                .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+        Map<String, Long> targetTopicToPartitionCounts = knownTargetTopicPartitions.stream()
+                .collect(Collectors.groupingBy(TopicPartition::topic, Collectors.counting())).entrySet().stream()
+                .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+
+        Set<String> knownSourceTopics = sourceTopicToPartitionCounts.keySet();
+        Set<String> knownTargetTopics = targetTopicToPartitionCounts.keySet();
+        Map<String, String> sourceToRemoteTopics = knownSourceTopics.stream()
+                .collect(Collectors.toMap(Function.identity(), sourceTopic -> formatRemoteTopic(sourceTopic)));
+
+        // compute existing and new source topics
+        Map<Boolean, Set<String>> partitionedSourceTopics = knownSourceTopics.stream()
+                .collect(Collectors.partitioningBy(sourceTopic -> knownTargetTopics.contains(sourceToRemoteTopics.get(sourceTopic)),
+                        Collectors.toSet()));
+        Set<String> existingSourceTopics = partitionedSourceTopics.get(true);
+        Set<String> newSourceTopics = partitionedSourceTopics.get(false);
+
+        // create new topics
+        if (!newSourceTopics.isEmpty())
+            createNewTopics(newSourceTopics, sourceTopicToPartitionCounts);
+
+        // compute topics with new partitions
+        Map<String, Long> sourceTopicsWithNewPartitions = existingSourceTopics.stream()
+                .filter(sourceTopic -> {
+                    String targetTopic = sourceToRemoteTopics.get(sourceTopic);
+                    return sourceTopicToPartitionCounts.get(sourceTopic) > targetTopicToPartitionCounts.get(targetTopic);
+                })
+                .collect(Collectors.toMap(Function.identity(), sourceTopicToPartitionCounts::get));
+
+        // create new partitions
+        if (!sourceTopicsWithNewPartitions.isEmpty()) {
+            Map<String, NewPartitions> newTargetPartitions = sourceTopicsWithNewPartitions.entrySet().stream()
+                    .collect(Collectors.toMap(sourceTopicAndPartitionCount -> sourceToRemoteTopics.get(sourceTopicAndPartitionCount.getKey()),
+                        sourceTopicAndPartitionCount -> NewPartitions.increaseTo(sourceTopicAndPartitionCount.getValue().intValue())));
+            createNewPartitions(newTargetPartitions);
+        }
+    }
+
+    private void createNewTopics(Set<String> newSourceTopics, Map<String, Long> sourceTopicToPartitionCounts)
+            throws ExecutionException, InterruptedException {
+        Map<String, Config> sourceTopicToConfig = describeTopicConfigs(newSourceTopics);
+        List<NewTopic> newTopics = newSourceTopics.stream()
+                .map(sourceTopic -> {
+                    String remoteTopic = formatRemoteTopic(sourceTopic);
+                    int partitionCount = sourceTopicToPartitionCounts.get(sourceTopic).intValue();
+                    Map<String, String> configs = configToMap(sourceTopicToConfig.get(sourceTopic));
+                    return new NewTopic(remoteTopic, partitionCount, (short) replicationFactor)
+                            .configs(configs);
+                })
+                .collect(Collectors.toList());
+        createNewTopics(newTopics);
     }
 
     // visible for testing
-    void createTopicPartitions(Map<String, Long> partitionCounts, List<NewTopic> newTopics,
-            Map<String, NewPartitions> newPartitions) {
+    void createNewTopics(List<NewTopic> newTopics) {
+        Map<String, NewTopic> newTopicMap = newTopics.stream()
+                .collect(Collectors.toMap(NewTopic::name, Function.identity()));

Review comment:
       Makes sense, I made the change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dhruvilshah3 commented on a change in pull request #10217: KAFKA-12254: Ensure MM2 creates topics with source topic configs

Posted by GitBox <gi...@apache.org>.
dhruvilshah3 commented on a change in pull request #10217:
URL: https://github.com/apache/kafka/pull/10217#discussion_r583397577



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
##########
@@ -309,18 +309,37 @@ private void createOffsetSyncsTopic() {
     // visible for testing
     void computeAndCreateTopicPartitions()
             throws InterruptedException, ExecutionException {
-        Map<String, Long> partitionCounts = knownSourceTopicPartitions.stream()
+        Map<String, Long> topicToParitionCount = knownSourceTopicPartitions.stream()
             .collect(Collectors.groupingBy(TopicPartition::topic, Collectors.counting())).entrySet().stream()
             .collect(Collectors.toMap(x -> formatRemoteTopic(x.getKey()), Entry::getValue));
         Set<String> knownTargetTopics = toTopics(knownTargetTopicPartitions);
-        List<NewTopic> newTopics = partitionCounts.entrySet().stream()
-            .filter(x -> !knownTargetTopics.contains(x.getKey()))
-            .map(x -> new NewTopic(x.getKey(), x.getValue().intValue(), (short) replicationFactor))
+
+        // get the set of topics that are not present on the target
+        Set<String> topicsToCreate = topicToParitionCount.keySet().stream()
+                .filter(topic -> !knownTargetTopics.contains(topic))
+                .collect(Collectors.toSet());
+        if (topicsToCreate.isEmpty())
+            return;
+
+        // get corresponding topic configurations from the source
+        Map<String, Config> configs = describeTopicConfigs(topicsToCreate);

Review comment:
       This is not the right way to grab the topic configs as the names have been formatted using `formatRemoteTopic`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #10217: KAFKA-12254: Ensure MM2 creates topics with source topic configs

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #10217:
URL: https://github.com/apache/kafka/pull/10217#discussion_r583591967



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
##########
@@ -306,40 +307,84 @@ private void createOffsetSyncsTopic() {
         MirrorUtils.createSinglePartitionCompactedTopic(config.offsetSyncsTopic(), config.offsetSyncsTopicReplicationFactor(), config.sourceAdminConfig());
     }
 
-    // visible for testing
-    void computeAndCreateTopicPartitions()
-            throws InterruptedException, ExecutionException {
-        Map<String, Long> partitionCounts = knownSourceTopicPartitions.stream()
-            .collect(Collectors.groupingBy(TopicPartition::topic, Collectors.counting())).entrySet().stream()
-            .collect(Collectors.toMap(x -> formatRemoteTopic(x.getKey()), Entry::getValue));
-        Set<String> knownTargetTopics = toTopics(knownTargetTopicPartitions);
-        List<NewTopic> newTopics = partitionCounts.entrySet().stream()
-            .filter(x -> !knownTargetTopics.contains(x.getKey()))
-            .map(x -> new NewTopic(x.getKey(), x.getValue().intValue(), (short) replicationFactor))
-            .collect(Collectors.toList());
-        Map<String, NewPartitions> newPartitions = partitionCounts.entrySet().stream()
-            .filter(x -> knownTargetTopics.contains(x.getKey()))
-            .collect(Collectors.toMap(Entry::getKey, x -> NewPartitions.increaseTo(x.getValue().intValue())));
-        createTopicPartitions(partitionCounts, newTopics, newPartitions);
+    void computeAndCreateTopicPartitions() throws ExecutionException, InterruptedException {
+        // get source and target topics with respective partition counts
+        Map<String, Long> sourceTopicToPartitionCounts = knownSourceTopicPartitions.stream()
+                .collect(Collectors.groupingBy(TopicPartition::topic, Collectors.counting())).entrySet().stream()
+                .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+        Map<String, Long> targetTopicToPartitionCounts = knownTargetTopicPartitions.stream()
+                .collect(Collectors.groupingBy(TopicPartition::topic, Collectors.counting())).entrySet().stream()
+                .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+
+        Set<String> knownSourceTopics = sourceTopicToPartitionCounts.keySet();
+        Set<String> knownTargetTopics = targetTopicToPartitionCounts.keySet();
+        Map<String, String> sourceToRemoteTopics = knownSourceTopics.stream()
+                .collect(Collectors.toMap(Function.identity(), sourceTopic -> formatRemoteTopic(sourceTopic)));
+
+        // compute existing and new source topics
+        Map<Boolean, Set<String>> partitionedSourceTopics = knownSourceTopics.stream()
+                .collect(Collectors.partitioningBy(sourceTopic -> knownTargetTopics.contains(sourceToRemoteTopics.get(sourceTopic)),
+                        Collectors.toSet()));
+        Set<String> existingSourceTopics = partitionedSourceTopics.get(true);
+        Set<String> newSourceTopics = partitionedSourceTopics.get(false);
+
+        // create new topics
+        if (!newSourceTopics.isEmpty())
+            createNewTopics(newSourceTopics, sourceTopicToPartitionCounts);
+
+        // compute topics with new partitions
+        Map<String, Long> sourceTopicsWithNewPartitions = existingSourceTopics.stream()
+                .filter(sourceTopic -> {
+                    String targetTopic = sourceToRemoteTopics.get(sourceTopic);
+                    return sourceTopicToPartitionCounts.get(sourceTopic) > targetTopicToPartitionCounts.get(targetTopic);
+                })
+                .collect(Collectors.toMap(Function.identity(), sourceTopicToPartitionCounts::get));
+
+        // create new partitions
+        if (!sourceTopicsWithNewPartitions.isEmpty()) {
+            Map<String, NewPartitions> newTargetPartitions = sourceTopicsWithNewPartitions.entrySet().stream()
+                    .collect(Collectors.toMap(sourceTopicAndPartitionCount -> sourceToRemoteTopics.get(sourceTopicAndPartitionCount.getKey()),
+                        sourceTopicAndPartitionCount -> NewPartitions.increaseTo(sourceTopicAndPartitionCount.getValue().intValue())));
+            createNewPartitions(newTargetPartitions);
+        }
+    }
+
+    private void createNewTopics(Set<String> newSourceTopics, Map<String, Long> sourceTopicToPartitionCounts)
+            throws ExecutionException, InterruptedException {
+        Map<String, Config> sourceTopicToConfig = describeTopicConfigs(newSourceTopics);
+        List<NewTopic> newTopics = newSourceTopics.stream()
+                .map(sourceTopic -> {
+                    String remoteTopic = formatRemoteTopic(sourceTopic);
+                    int partitionCount = sourceTopicToPartitionCounts.get(sourceTopic).intValue();
+                    Map<String, String> configs = configToMap(sourceTopicToConfig.get(sourceTopic));
+                    return new NewTopic(remoteTopic, partitionCount, (short) replicationFactor)
+                            .configs(configs);
+                })
+                .collect(Collectors.toList());
+        createNewTopics(newTopics);
     }
 
     // visible for testing
-    void createTopicPartitions(Map<String, Long> partitionCounts, List<NewTopic> newTopics,
-            Map<String, NewPartitions> newPartitions) {
+    void createNewTopics(List<NewTopic> newTopics) {
+        Map<String, NewTopic> newTopicMap = newTopics.stream()
+                .collect(Collectors.toMap(NewTopic::name, Function.identity()));

Review comment:
       Should we make this `void createNewTopics(Map<String, NewTopic> newTopics) ` and just create the map in the caller since  we seem to be creating a list and transforming?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org