You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Jakob Homan (Jira)" <ji...@apache.org> on 2020/06/05 23:02:00 UTC
[jira] [Commented] (KAFKA-10094) In MirrorSourceConnector replace
two-step assignment with single call
[ https://issues.apache.org/jira/browse/KAFKA-10094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17127142#comment-17127142 ]
Jakob Homan commented on KAFKA-10094:
-------------------------------------
Patch accepted and merged.
> In MirrorSourceConnector replace two-step assignment with single call
> ---------------------------------------------------------------------
>
> Key: KAFKA-10094
> URL: https://issues.apache.org/jira/browse/KAFKA-10094
> Project: Kafka
> Issue Type: Improvement
> Components: mirrormaker
> Reporter: Jakob Homan
> Assignee: Mandar Tillu
> Priority: Trivial
> Labels: newbie
>
> n.b. This is a newbie ticket designed to be an introduction to contributing for the assignee.
>
> In MirrorSourceConnector::refreshTopicPartitions we have places where we create a new HashSet and then addAll to the set. We can replace both with a direct call to the copy constructor.
>
> {code:java}
> void refreshTopicPartitions()
> throws InterruptedException, ExecutionException {
> knownSourceTopicPartitions = findSourceTopicPartitions();
> knownTargetTopicPartitions = findTargetTopicPartitions();
> List<TopicPartition> upstreamTargetTopicPartitions = knownTargetTopicPartitions.stream()
> .map(x -> new TopicPartition(replicationPolicy.upstreamTopic(x.topic()), x.partition()))
> .collect(Collectors.toList());
> Set<TopicPartition> newTopicPartitions = new HashSet<>();
> newTopicPartitions.addAll(knownSourceTopicPartitions);
> newTopicPartitions.removeAll(upstreamTargetTopicPartitions);
> Set<TopicPartition> deadTopicPartitions = new HashSet<>();
> deadTopicPartitions.addAll(upstreamTargetTopicPartitions);{code}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)