You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/10/19 16:17:13 UTC

[kafka] branch 2.7 updated: KAFKA-10332: Update MM2 refreshTopicPartitions() logic (#9343)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 09e5282  KAFKA-10332: Update MM2 refreshTopicPartitions() logic (#9343)
09e5282 is described below

commit 09e52824d0996074b476ceca7d05dba0c17ce548
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Mon Oct 19 17:51:44 2020 +0200

    KAFKA-10332: Update MM2 refreshTopicPartitions() logic (#9343)
    
    Trigger task reconfiguration when:
    - topic-partitions are created or deleted on source cluster
    - topic-partitions are missing on target cluster
    
    Authors: Mickael Maison <mi...@gmail.com>, Edoardo Comar <ec...@uk.ibm.com>
    Reviewer: Randall Hauch <rh...@gmail.com>
---
 .../connect/mirror/MirrorSourceConnector.java      | 52 ++++++++++++++++------
 .../connect/mirror/MirrorSourceConnectorTest.java  | 31 ++++++++++++-
 2 files changed, 67 insertions(+), 16 deletions(-)

diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
index e6d6922..0f6eb46 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -96,7 +96,7 @@ public class MirrorSourceConnector extends SourceConnector {
         this.replicationPolicy = replicationPolicy;
         this.topicFilter = topicFilter;
         this.configPropertyFilter = configPropertyFilter;
-    } 
+    }
 
     @Override
     public void start(Map<String, String> props) {
@@ -202,6 +202,7 @@ public class MirrorSourceConnector extends SourceConnector {
             throws InterruptedException, ExecutionException {
         Set<String> topics = listTopics(targetAdminClient).stream()
             .filter(t -> sourceAndTarget.source().equals(replicationPolicy.topicSource(t)))
+            .filter(t -> !t.equals(config.checkpointsTopic()))
             .collect(Collectors.toSet());
         return describeTopics(targetAdminClient, topics).stream()
                 .flatMap(MirrorSourceConnector::expandTopicDescription)
@@ -211,21 +212,44 @@ public class MirrorSourceConnector extends SourceConnector {
     // visible for testing
     void refreshTopicPartitions()
             throws InterruptedException, ExecutionException {
-        knownSourceTopicPartitions = findSourceTopicPartitions();
-        knownTargetTopicPartitions = findTargetTopicPartitions();
-        List<TopicPartition> upstreamTargetTopicPartitions = knownTargetTopicPartitions.stream()
+
+        List<TopicPartition> sourceTopicPartitions = findSourceTopicPartitions();
+        List<TopicPartition> targetTopicPartitions = findTargetTopicPartitions();
+
+        Set<TopicPartition> sourceTopicPartitionsSet = new HashSet<>(sourceTopicPartitions);
+        Set<TopicPartition> knownSourceTopicPartitionsSet = new HashSet<>(knownSourceTopicPartitions);
+
+        Set<TopicPartition> upstreamTargetTopicPartitions = targetTopicPartitions.stream()
                 .map(x -> new TopicPartition(replicationPolicy.upstreamTopic(x.topic()), x.partition()))
-                .collect(Collectors.toList());
+                .collect(Collectors.toSet());
+
+        Set<TopicPartition> missingInTarget = new HashSet<>(sourceTopicPartitions);
+        missingInTarget.removeAll(upstreamTargetTopicPartitions);
+
+        knownTargetTopicPartitions = targetTopicPartitions;
+
+        // Detect if topic-partitions were added or deleted from the source cluster
+        // or if topic-partitions are missing from the target cluster
+        if (!knownSourceTopicPartitionsSet.equals(sourceTopicPartitionsSet) || !missingInTarget.isEmpty()) {
+
+            Set<TopicPartition> newTopicPartitions = sourceTopicPartitionsSet;
+            newTopicPartitions.removeAll(knownSourceTopicPartitions);
+
+            Set<TopicPartition> deletedTopicPartitions = knownSourceTopicPartitionsSet;
+            deletedTopicPartitions.removeAll(sourceTopicPartitions);
+
+            log.info("Found {} new topic-partitions on {}. " +
+                     "Found {} deleted topic-partitions on {}. " +
+                     "Found {} topic-partitions missing on {}.",
+                     newTopicPartitions.size(), sourceAndTarget.source(),
+                     deletedTopicPartitions.size(), sourceAndTarget.source(),
+                     missingInTarget.size(), sourceAndTarget.target());
+
+            log.trace("Found new topic-partitions on {}: {}", sourceAndTarget.source(), newTopicPartitions);
+            log.trace("Found deleted topic-partitions on {}: {}", sourceAndTarget.source(), deletedTopicPartitions);
+            log.trace("Found missing topic-partitions on {}: {}", sourceAndTarget.target(), missingInTarget);
 
-        Set<TopicPartition> newTopicPartitions = new HashSet<>(knownSourceTopicPartitions);
-        newTopicPartitions.removeAll(upstreamTargetTopicPartitions);
-        Set<TopicPartition> deadTopicPartitions = new HashSet<>(upstreamTargetTopicPartitions);
-        deadTopicPartitions.removeAll(knownSourceTopicPartitions);
-        if (!newTopicPartitions.isEmpty() || !deadTopicPartitions.isEmpty()) {
-            log.info("Found {} topic-partitions on {}. {} are new. {} were removed. Previously had {}.",
-                    knownSourceTopicPartitions.size(), sourceAndTarget.source(), newTopicPartitions.size(),
-                    deadTopicPartitions.size(), knownSourceTopicPartitions.size());
-            log.trace("Found new topic-partitions: {}", newTopicPartitions);
+            knownSourceTopicPartitions = sourceTopicPartitions;
             computeAndCreateTopicPartitions();
             context.requestTaskReconfiguration();
         }
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index e86d21e..c915845 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -186,7 +186,7 @@ public class MirrorSourceConnectorTest {
         connector.initialize(mock(ConnectorContext.class));
         connector = spy(connector);
 
-        List<TopicPartition> sourceTopicPartitions = Arrays.asList(new TopicPartition("topic", 0));
+        List<TopicPartition> sourceTopicPartitions = Collections.singletonList(new TopicPartition("topic", 0));
         doReturn(sourceTopicPartitions).when(connector).findSourceTopicPartitions();
         doReturn(Collections.emptyList()).when(connector).findTargetTopicPartitions();
         doNothing().when(connector).createTopicPartitions(any(), any(), any());
@@ -205,11 +205,38 @@ public class MirrorSourceConnectorTest {
                 eq(expectedNewTopics),
                 eq(Collections.emptyMap()));
 
-        List<TopicPartition> targetTopicPartitions = Arrays.asList(new TopicPartition("source.topic", 0));
+        List<TopicPartition> targetTopicPartitions = Collections.singletonList(new TopicPartition("source.topic", 0));
         doReturn(targetTopicPartitions).when(connector).findTargetTopicPartitions();
         connector.refreshTopicPartitions();
 
         // once target topic is created, refreshTopicPartitions() will NOT call computeAndCreateTopicPartitions() again
         verify(connector, times(2)).computeAndCreateTopicPartitions();
     }
+
+    @Test
+    public void testRefreshTopicPartitionsTopicOnTargetFirst() throws Exception {
+        MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(), new DefaultTopicFilter(), new DefaultConfigPropertyFilter());
+        connector.initialize(mock(ConnectorContext.class));
+        connector = spy(connector);
+
+        List<TopicPartition> sourceTopicPartitions = Collections.emptyList();
+        List<TopicPartition> targetTopicPartitions = Collections.singletonList(new TopicPartition("source.topic", 0));
+        doReturn(sourceTopicPartitions).when(connector).findSourceTopicPartitions();
+        doReturn(targetTopicPartitions).when(connector).findTargetTopicPartitions();
+        doNothing().when(connector).createTopicPartitions(any(), any(), any());
+
+        // partitions appearing on the target cluster should not cause reconfiguration
+        connector.refreshTopicPartitions();
+        connector.refreshTopicPartitions();
+        verify(connector, times(0)).computeAndCreateTopicPartitions();
+
+        sourceTopicPartitions = Collections.singletonList(new TopicPartition("topic", 0));
+        doReturn(sourceTopicPartitions).when(connector).findSourceTopicPartitions();
+
+        // when partitions are added to the source cluster, reconfiguration is triggered
+        connector.refreshTopicPartitions();
+        verify(connector, times(1)).computeAndCreateTopicPartitions();
+
+    }
 }