You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/10/19 16:17:13 UTC
[kafka] branch 2.7 updated: KAFKA-10332: Update MM2
refreshTopicPartitions() logic (#9343)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 09e5282 KAFKA-10332: Update MM2 refreshTopicPartitions() logic (#9343)
09e5282 is described below
commit 09e52824d0996074b476ceca7d05dba0c17ce548
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Mon Oct 19 17:51:44 2020 +0200
KAFKA-10332: Update MM2 refreshTopicPartitions() logic (#9343)
Trigger task reconfiguration when:
- topic-partitions are created or deleted on source cluster
- topic-partitions are missing on target cluster
Authors: Mickael Maison <mi...@gmail.com>, Edoardo Comar <ec...@uk.ibm.com>
Reviewer: Randall Hauch <rh...@gmail.com>
---
.../connect/mirror/MirrorSourceConnector.java | 52 ++++++++++++++++------
.../connect/mirror/MirrorSourceConnectorTest.java | 31 ++++++++++++-
2 files changed, 67 insertions(+), 16 deletions(-)
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
index e6d6922..0f6eb46 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -96,7 +96,7 @@ public class MirrorSourceConnector extends SourceConnector {
this.replicationPolicy = replicationPolicy;
this.topicFilter = topicFilter;
this.configPropertyFilter = configPropertyFilter;
- }
+ }
@Override
public void start(Map<String, String> props) {
@@ -202,6 +202,7 @@ public class MirrorSourceConnector extends SourceConnector {
throws InterruptedException, ExecutionException {
Set<String> topics = listTopics(targetAdminClient).stream()
.filter(t -> sourceAndTarget.source().equals(replicationPolicy.topicSource(t)))
+ .filter(t -> !t.equals(config.checkpointsTopic()))
.collect(Collectors.toSet());
return describeTopics(targetAdminClient, topics).stream()
.flatMap(MirrorSourceConnector::expandTopicDescription)
@@ -211,21 +212,44 @@ public class MirrorSourceConnector extends SourceConnector {
// visible for testing
void refreshTopicPartitions()
throws InterruptedException, ExecutionException {
- knownSourceTopicPartitions = findSourceTopicPartitions();
- knownTargetTopicPartitions = findTargetTopicPartitions();
- List<TopicPartition> upstreamTargetTopicPartitions = knownTargetTopicPartitions.stream()
+
+ List<TopicPartition> sourceTopicPartitions = findSourceTopicPartitions();
+ List<TopicPartition> targetTopicPartitions = findTargetTopicPartitions();
+
+ Set<TopicPartition> sourceTopicPartitionsSet = new HashSet<>(sourceTopicPartitions);
+ Set<TopicPartition> knownSourceTopicPartitionsSet = new HashSet<>(knownSourceTopicPartitions);
+
+ Set<TopicPartition> upstreamTargetTopicPartitions = targetTopicPartitions.stream()
.map(x -> new TopicPartition(replicationPolicy.upstreamTopic(x.topic()), x.partition()))
- .collect(Collectors.toList());
+ .collect(Collectors.toSet());
+
+ Set<TopicPartition> missingInTarget = new HashSet<>(sourceTopicPartitions);
+ missingInTarget.removeAll(upstreamTargetTopicPartitions);
+
+ knownTargetTopicPartitions = targetTopicPartitions;
+
+ // Detect if topic-partitions were added or deleted from the source cluster
+ // or if topic-partitions are missing from the target cluster
+ if (!knownSourceTopicPartitionsSet.equals(sourceTopicPartitionsSet) || !missingInTarget.isEmpty()) {
+
+ Set<TopicPartition> newTopicPartitions = sourceTopicPartitionsSet;
+ newTopicPartitions.removeAll(knownSourceTopicPartitions);
+
+ Set<TopicPartition> deletedTopicPartitions = knownSourceTopicPartitionsSet;
+ deletedTopicPartitions.removeAll(sourceTopicPartitions);
+
+ log.info("Found {} new topic-partitions on {}. " +
+ "Found {} deleted topic-partitions on {}. " +
+ "Found {} topic-partitions missing on {}.",
+ newTopicPartitions.size(), sourceAndTarget.source(),
+ deletedTopicPartitions.size(), sourceAndTarget.source(),
+ missingInTarget.size(), sourceAndTarget.target());
+
+ log.trace("Found new topic-partitions on {}: {}", sourceAndTarget.source(), newTopicPartitions);
+ log.trace("Found deleted topic-partitions on {}: {}", sourceAndTarget.source(), deletedTopicPartitions);
+ log.trace("Found missing topic-partitions on {}: {}", sourceAndTarget.target(), missingInTarget);
- Set<TopicPartition> newTopicPartitions = new HashSet<>(knownSourceTopicPartitions);
- newTopicPartitions.removeAll(upstreamTargetTopicPartitions);
- Set<TopicPartition> deadTopicPartitions = new HashSet<>(upstreamTargetTopicPartitions);
- deadTopicPartitions.removeAll(knownSourceTopicPartitions);
- if (!newTopicPartitions.isEmpty() || !deadTopicPartitions.isEmpty()) {
- log.info("Found {} topic-partitions on {}. {} are new. {} were removed. Previously had {}.",
- knownSourceTopicPartitions.size(), sourceAndTarget.source(), newTopicPartitions.size(),
- deadTopicPartitions.size(), knownSourceTopicPartitions.size());
- log.trace("Found new topic-partitions: {}", newTopicPartitions);
+ knownSourceTopicPartitions = sourceTopicPartitions;
computeAndCreateTopicPartitions();
context.requestTaskReconfiguration();
}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index e86d21e..c915845 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -186,7 +186,7 @@ public class MirrorSourceConnectorTest {
connector.initialize(mock(ConnectorContext.class));
connector = spy(connector);
- List<TopicPartition> sourceTopicPartitions = Arrays.asList(new TopicPartition("topic", 0));
+ List<TopicPartition> sourceTopicPartitions = Collections.singletonList(new TopicPartition("topic", 0));
doReturn(sourceTopicPartitions).when(connector).findSourceTopicPartitions();
doReturn(Collections.emptyList()).when(connector).findTargetTopicPartitions();
doNothing().when(connector).createTopicPartitions(any(), any(), any());
@@ -205,11 +205,38 @@ public class MirrorSourceConnectorTest {
eq(expectedNewTopics),
eq(Collections.emptyMap()));
- List<TopicPartition> targetTopicPartitions = Arrays.asList(new TopicPartition("source.topic", 0));
+ List<TopicPartition> targetTopicPartitions = Collections.singletonList(new TopicPartition("source.topic", 0));
doReturn(targetTopicPartitions).when(connector).findTargetTopicPartitions();
connector.refreshTopicPartitions();
// once target topic is created, refreshTopicPartitions() will NOT call computeAndCreateTopicPartitions() again
verify(connector, times(2)).computeAndCreateTopicPartitions();
}
+
+ @Test
+ public void testRefreshTopicPartitionsTopicOnTargetFirst() throws Exception {
+ MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+ new DefaultReplicationPolicy(), new DefaultTopicFilter(), new DefaultConfigPropertyFilter());
+ connector.initialize(mock(ConnectorContext.class));
+ connector = spy(connector);
+
+ List<TopicPartition> sourceTopicPartitions = Collections.emptyList();
+ List<TopicPartition> targetTopicPartitions = Collections.singletonList(new TopicPartition("source.topic", 0));
+ doReturn(sourceTopicPartitions).when(connector).findSourceTopicPartitions();
+ doReturn(targetTopicPartitions).when(connector).findTargetTopicPartitions();
+ doNothing().when(connector).createTopicPartitions(any(), any(), any());
+
+ // partitions appearing on the target cluster should not cause reconfiguration
+ connector.refreshTopicPartitions();
+ connector.refreshTopicPartitions();
+ verify(connector, times(0)).computeAndCreateTopicPartitions();
+
+ sourceTopicPartitions = Collections.singletonList(new TopicPartition("topic", 0));
+ doReturn(sourceTopicPartitions).when(connector).findSourceTopicPartitions();
+
+ // when partitions are added to the source cluster, reconfiguration is triggered
+ connector.refreshTopicPartitions();
+ verify(connector, times(1)).computeAndCreateTopicPartitions();
+
+ }
}