You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/26 12:08:07 UTC

[GitHub] [kafka] rajinisivaram commented on a change in pull request #10217: KAFKA-12254: Ensure MM2 creates topics with source topic configs

rajinisivaram commented on a change in pull request #10217:
URL: https://github.com/apache/kafka/pull/10217#discussion_r583591967



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
##########
@@ -306,40 +307,84 @@ private void createOffsetSyncsTopic() {
         MirrorUtils.createSinglePartitionCompactedTopic(config.offsetSyncsTopic(), config.offsetSyncsTopicReplicationFactor(), config.sourceAdminConfig());
     }
 
-    // visible for testing
-    void computeAndCreateTopicPartitions()
-            throws InterruptedException, ExecutionException {
-        Map<String, Long> partitionCounts = knownSourceTopicPartitions.stream()
-            .collect(Collectors.groupingBy(TopicPartition::topic, Collectors.counting())).entrySet().stream()
-            .collect(Collectors.toMap(x -> formatRemoteTopic(x.getKey()), Entry::getValue));
-        Set<String> knownTargetTopics = toTopics(knownTargetTopicPartitions);
-        List<NewTopic> newTopics = partitionCounts.entrySet().stream()
-            .filter(x -> !knownTargetTopics.contains(x.getKey()))
-            .map(x -> new NewTopic(x.getKey(), x.getValue().intValue(), (short) replicationFactor))
-            .collect(Collectors.toList());
-        Map<String, NewPartitions> newPartitions = partitionCounts.entrySet().stream()
-            .filter(x -> knownTargetTopics.contains(x.getKey()))
-            .collect(Collectors.toMap(Entry::getKey, x -> NewPartitions.increaseTo(x.getValue().intValue())));
-        createTopicPartitions(partitionCounts, newTopics, newPartitions);
+    void computeAndCreateTopicPartitions() throws ExecutionException, InterruptedException {
+        // get source and target topics with respective partition counts
+        Map<String, Long> sourceTopicToPartitionCounts = knownSourceTopicPartitions.stream()
+                .collect(Collectors.groupingBy(TopicPartition::topic, Collectors.counting())).entrySet().stream()
+                .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+        Map<String, Long> targetTopicToPartitionCounts = knownTargetTopicPartitions.stream()
+                .collect(Collectors.groupingBy(TopicPartition::topic, Collectors.counting())).entrySet().stream()
+                .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+
+        Set<String> knownSourceTopics = sourceTopicToPartitionCounts.keySet();
+        Set<String> knownTargetTopics = targetTopicToPartitionCounts.keySet();
+        Map<String, String> sourceToRemoteTopics = knownSourceTopics.stream()
+                .collect(Collectors.toMap(Function.identity(), sourceTopic -> formatRemoteTopic(sourceTopic)));
+
+        // compute existing and new source topics
+        Map<Boolean, Set<String>> partitionedSourceTopics = knownSourceTopics.stream()
+                .collect(Collectors.partitioningBy(sourceTopic -> knownTargetTopics.contains(sourceToRemoteTopics.get(sourceTopic)),
+                        Collectors.toSet()));
+        Set<String> existingSourceTopics = partitionedSourceTopics.get(true);
+        Set<String> newSourceTopics = partitionedSourceTopics.get(false);
+
+        // create new topics
+        if (!newSourceTopics.isEmpty())
+            createNewTopics(newSourceTopics, sourceTopicToPartitionCounts);
+
+        // compute topics with new partitions
+        Map<String, Long> sourceTopicsWithNewPartitions = existingSourceTopics.stream()
+                .filter(sourceTopic -> {
+                    String targetTopic = sourceToRemoteTopics.get(sourceTopic);
+                    return sourceTopicToPartitionCounts.get(sourceTopic) > targetTopicToPartitionCounts.get(targetTopic);
+                })
+                .collect(Collectors.toMap(Function.identity(), sourceTopicToPartitionCounts::get));
+
+        // create new partitions
+        if (!sourceTopicsWithNewPartitions.isEmpty()) {
+            Map<String, NewPartitions> newTargetPartitions = sourceTopicsWithNewPartitions.entrySet().stream()
+                    .collect(Collectors.toMap(sourceTopicAndPartitionCount -> sourceToRemoteTopics.get(sourceTopicAndPartitionCount.getKey()),
+                        sourceTopicAndPartitionCount -> NewPartitions.increaseTo(sourceTopicAndPartitionCount.getValue().intValue())));
+            createNewPartitions(newTargetPartitions);
+        }
+    }
+
+    private void createNewTopics(Set<String> newSourceTopics, Map<String, Long> sourceTopicToPartitionCounts)
+            throws ExecutionException, InterruptedException {
+        Map<String, Config> sourceTopicToConfig = describeTopicConfigs(newSourceTopics);
+        List<NewTopic> newTopics = newSourceTopics.stream()
+                .map(sourceTopic -> {
+                    String remoteTopic = formatRemoteTopic(sourceTopic);
+                    int partitionCount = sourceTopicToPartitionCounts.get(sourceTopic).intValue();
+                    Map<String, String> configs = configToMap(sourceTopicToConfig.get(sourceTopic));
+                    return new NewTopic(remoteTopic, partitionCount, (short) replicationFactor)
+                            .configs(configs);
+                })
+                .collect(Collectors.toList());
+        createNewTopics(newTopics);
     }
 
     // visible for testing
-    void createTopicPartitions(Map<String, Long> partitionCounts, List<NewTopic> newTopics,
-            Map<String, NewPartitions> newPartitions) {
+    void createNewTopics(List<NewTopic> newTopics) {
+        Map<String, NewTopic> newTopicMap = newTopics.stream()
+                .collect(Collectors.toMap(NewTopic::name, Function.identity()));

Review comment:
       Should we make this `void createNewTopics(Map<String, NewTopic> newTopics) ` and just create the map in the caller since  we seem to be creating a list and transforming?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org