You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/26 18:24:23 UTC

[GitHub] [kafka] mimaison opened a new pull request #10221: KAFKA-12379: Allow configuring the location of the offset-syncs topic…

mimaison opened a new pull request #10221:
URL: https://github.com/apache/kafka/pull/10221


   … with MirrorMaker2
   
   This commit implements KIP-716. It introduces a new setting `offset-syncs.topic.location` that allows specifying where the offset-syncs topic is created.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #10221: KAFKA-12379: Allow configuring the location of the offset-syncs topic…

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #10221:
URL: https://github.com/apache/kafka/pull/10221#discussion_r658639377



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
##########
@@ -262,4 +264,65 @@ public void testTargetAdminConfigWithSourcePrefix() {
         assertEquals(expectedAdminProps, connectorAdminProps, prefix + " source connector admin props not matching");
     }
 
+    @Test
+    public void testOffsetSyncsTopic() {
+        // Invalid location
+        Map<String, String> connectorProps = makeProps("offset-syncs.topic.location", "something");
+        assertThrows(ConfigException.class, () -> new MirrorConnectorConfig(connectorProps));
+
+        connectorProps.put("offset-syncs.topic.location", "source");
+        MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps);
+        assertEquals("mm2-offset-syncs.target2.internal", config.offsetSyncsTopic());
+        connectorProps.put("offset-syncs.topic.location", "target");
+        config = new MirrorConnectorConfig(connectorProps);
+        assertEquals("mm2-offset-syncs.source1.internal", config.offsetSyncsTopic());
+        // Default to source
+        connectorProps.remove("offset-syncs.topic.location");
+        config = new MirrorConnectorConfig(connectorProps);
+        assertEquals("mm2-offset-syncs.target2.internal", config.offsetSyncsTopic());
+    }
+
+    @Test
+    public void testConsumerConfigsForOffsetSyncsTopic() {
+        Map<String, String> connectorProps = makeProps(
+                "source.consumer.max.partition.fetch.bytes", "1",
+                "target.consumer.heartbeat.interval.ms", "1",
+                "consumer.max.poll.interval.ms", "1",
+                "fetch.min.bytes", "1"
+        );
+        MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps);
+        assertEquals(config.sourceConsumerConfig(), config.offsetSyncsTopicConsumerConfig());
+        connectorProps.put("offset-syncs.topic.location", "target");
+        config = new MirrorConnectorConfig(connectorProps);
+        assertEquals(config.targetConsumerConfig(), config.offsetSyncsTopicConsumerConfig());
+    }
+
+    @Test
+    public void testProducerConfigsForOffsetSyncsTopic() {
+        Map<String, String> connectorProps = makeProps(
+                "source.batch.size", "1",
+                "target.acks", "1",
+                "producer.max.poll.interval.ms", "1",
+                "fetch.min.bytes", "1"
+        );
+        MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps);
+        assertEquals(config.sourceProducerConfig(), config.offsetSyncsTopicProducerConfig());
+        connectorProps.put("offset-syncs.topic.location", "target");
+        assertEquals(config.targetProducerConfig(), config.offsetSyncsTopicProducerConfig());
+    }
+
+    @Test
+    public void testAdminConfigsForOffsetSyncsTopic() {
+        Map<String, String> connectorProps = makeProps(
+                "source.request.timeout.ms", "1",
+                "target.send.buffer.bytes", "1",
+                "admin.reconnect.backoff.max.ms", "1",
+                "retries", "123"
+        );
+        MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps);
+        assertEquals(config.sourceAdminConfig(), config.offsetSyncsTopicAdminConfig());
+        connectorProps.put("offset-syncs.topic.location", "target");

Review comment:
       Same thing here with the `connectorProps` vs the `config`.

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
##########
@@ -262,4 +264,65 @@ public void testTargetAdminConfigWithSourcePrefix() {
         assertEquals(expectedAdminProps, connectorAdminProps, prefix + " source connector admin props not matching");
     }
 
+    @Test
+    public void testOffsetSyncsTopic() {
+        // Invalid location
+        Map<String, String> connectorProps = makeProps("offset-syncs.topic.location", "something");
+        assertThrows(ConfigException.class, () -> new MirrorConnectorConfig(connectorProps));
+
+        connectorProps.put("offset-syncs.topic.location", "source");
+        MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps);
+        assertEquals("mm2-offset-syncs.target2.internal", config.offsetSyncsTopic());
+        connectorProps.put("offset-syncs.topic.location", "target");
+        config = new MirrorConnectorConfig(connectorProps);
+        assertEquals("mm2-offset-syncs.source1.internal", config.offsetSyncsTopic());
+        // Default to source
+        connectorProps.remove("offset-syncs.topic.location");
+        config = new MirrorConnectorConfig(connectorProps);
+        assertEquals("mm2-offset-syncs.target2.internal", config.offsetSyncsTopic());
+    }
+
+    @Test
+    public void testConsumerConfigsForOffsetSyncsTopic() {
+        Map<String, String> connectorProps = makeProps(
+                "source.consumer.max.partition.fetch.bytes", "1",
+                "target.consumer.heartbeat.interval.ms", "1",
+                "consumer.max.poll.interval.ms", "1",
+                "fetch.min.bytes", "1"
+        );
+        MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps);
+        assertEquals(config.sourceConsumerConfig(), config.offsetSyncsTopicConsumerConfig());
+        connectorProps.put("offset-syncs.topic.location", "target");
+        config = new MirrorConnectorConfig(connectorProps);
+        assertEquals(config.targetConsumerConfig(), config.offsetSyncsTopicConsumerConfig());
+    }
+
+    @Test
+    public void testProducerConfigsForOffsetSyncsTopic() {
+        Map<String, String> connectorProps = makeProps(
+                "source.batch.size", "1",
+                "target.acks", "1",
+                "producer.max.poll.interval.ms", "1",
+                "fetch.min.bytes", "1"
+        );
+        MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps);
+        assertEquals(config.sourceProducerConfig(), config.offsetSyncsTopicProducerConfig());
+        connectorProps.put("offset-syncs.topic.location", "target");

Review comment:
       Same thing here with the `connectorProps` vs the `config`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #10221: KAFKA-12379: Allow configuring the location of the offset-syncs topic…

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #10221:
URL: https://github.com/apache/kafka/pull/10221#issuecomment-867792968


   @kkonstantine @tombentley As you voted on the KIP, can you take a look at this PR? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #10221: KAFKA-12379: Allow configuring the location of the offset-syncs topic…

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #10221:
URL: https://github.com/apache/kafka/pull/10221#issuecomment-868375379


   Thanks @tombentley, I've pushed an update


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #10221: KAFKA-12379: Allow configuring the location of the offset-syncs topic…

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #10221:
URL: https://github.com/apache/kafka/pull/10221#discussion_r658822455



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
##########
@@ -262,4 +264,65 @@ public void testTargetAdminConfigWithSourcePrefix() {
         assertEquals(expectedAdminProps, connectorAdminProps, prefix + " source connector admin props not matching");
     }
 
+    @Test
+    public void testOffsetSyncsTopic() {
+        // Invalid location
+        Map<String, String> connectorProps = makeProps("offset-syncs.topic.location", "something");
+        assertThrows(ConfigException.class, () -> new MirrorConnectorConfig(connectorProps));
+
+        connectorProps.put("offset-syncs.topic.location", "source");
+        MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps);
+        assertEquals("mm2-offset-syncs.target2.internal", config.offsetSyncsTopic());
+        connectorProps.put("offset-syncs.topic.location", "target");
+        config = new MirrorConnectorConfig(connectorProps);
+        assertEquals("mm2-offset-syncs.source1.internal", config.offsetSyncsTopic());
+        // Default to source
+        connectorProps.remove("offset-syncs.topic.location");
+        config = new MirrorConnectorConfig(connectorProps);
+        assertEquals("mm2-offset-syncs.target2.internal", config.offsetSyncsTopic());
+    }
+
+    @Test
+    public void testConsumerConfigsForOffsetSyncsTopic() {
+        Map<String, String> connectorProps = makeProps(
+                "source.consumer.max.partition.fetch.bytes", "1",
+                "target.consumer.heartbeat.interval.ms", "1",
+                "consumer.max.poll.interval.ms", "1",
+                "fetch.min.bytes", "1"
+        );
+        MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps);
+        assertEquals(config.sourceConsumerConfig(), config.offsetSyncsTopicConsumerConfig());
+        connectorProps.put("offset-syncs.topic.location", "target");
+        config = new MirrorConnectorConfig(connectorProps);
+        assertEquals(config.targetConsumerConfig(), config.offsetSyncsTopicConsumerConfig());
+    }
+
+    @Test
+    public void testProducerConfigsForOffsetSyncsTopic() {
+        Map<String, String> connectorProps = makeProps(
+                "source.batch.size", "1",
+                "target.acks", "1",
+                "producer.max.poll.interval.ms", "1",
+                "fetch.min.bytes", "1"
+        );
+        MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps);
+        assertEquals(config.sourceProducerConfig(), config.offsetSyncsTopicProducerConfig());
+        connectorProps.put("offset-syncs.topic.location", "target");

Review comment:
       Well, maybe I should have pointed them _all_ out the first time.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #10221: KAFKA-12379: Allow configuring the location of the offset-syncs topic…

Posted by GitBox <gi...@apache.org>.
mimaison merged pull request #10221:
URL: https://github.com/apache/kafka/pull/10221


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #10221: KAFKA-12379: Allow configuring the location of the offset-syncs topic…

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #10221:
URL: https://github.com/apache/kafka/pull/10221#discussion_r658502529



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
##########
@@ -246,4 +248,67 @@ public void testTargetAdminConfigWithSourcePrefix() {
         assertEquals(expectedAdminProps, connectorAdminProps);
     }
 
+    @Test
+    public void testOffsetSyncsTopic() {
+        // Invalid location
+        Map<String, String> connectorProps = makeProps("offset-syncs.topic.location", "something");
+        try {
+            new MirrorConnectorConfig(connectorProps);
+            fail("Should have thrown ConfigException");
+        } catch (ConfigException exc) { } // expected
+
+        connectorProps.put("offset-syncs.topic.location", "source");
+        MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps);
+        assertEquals("mm2-offset-syncs.target2.internal", config.offsetSyncsTopic());
+        connectorProps.put("offset-syncs.topic.location", "target");
+        config = new MirrorConnectorConfig(connectorProps);
+        assertEquals("mm2-offset-syncs.source1.internal", config.offsetSyncsTopic());
+        // Default to source
+        connectorProps.remove("offset-syncs.topic.location");
+        config = new MirrorConnectorConfig(connectorProps);
+        assertEquals("mm2-offset-syncs.target2.internal", config.offsetSyncsTopic());
+    }
+
+    @Test
+    public void testConsumerConfigsForOffsetSyncsTopic() {
+        Map<String, String> connectorProps = makeProps(
+                "source.max.partition.fetch.bytes", "1",
+                "target.heartbeat.interval.ms", "1",
+                "consumer.max.poll.interval.ms", "1",
+                "fetch.min.bytes", "1"
+        );
+        MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps);
+        assertEquals(config.sourceConsumerConfig(), config.offsetSyncsTopicConsumerConfig());
+        connectorProps.put("offset-syncs.topic.location", "target");

Review comment:
       Adding to `connectorProps` won't change the already instantiated `config`.

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
##########
@@ -246,4 +248,67 @@ public void testTargetAdminConfigWithSourcePrefix() {
         assertEquals(expectedAdminProps, connectorAdminProps);
     }
 
+    @Test
+    public void testOffsetSyncsTopic() {
+        // Invalid location
+        Map<String, String> connectorProps = makeProps("offset-syncs.topic.location", "something");
+        try {
+            new MirrorConnectorConfig(connectorProps);
+            fail("Should have thrown ConfigException");
+        } catch (ConfigException exc) { } // expected

Review comment:
       assertThrows

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
##########
@@ -435,7 +438,40 @@ public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedExceptio
         assertEquals(0, records.count(), "consumer record size is not zero");
         backupConsumer.close();
     }
-    
+
+    @Test
+    public void testOffsetSyncsTopicsOnTarget() throws Exception {
+        // move offset-syncs topics to target
+        mm2Props.put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS + ".offset-syncs.topic.location", "target");
+        // one way replication from primary to backup
+        mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false");
+
+        mm2Config = new MirrorMakerConfig(mm2Props);
+
+        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+
+        // Ensure the offset syncs topic is created in the target cluster
+        waitForTopicCreated(backup.kafka(), "mm2-offset-syncs." + PRIMARY_CLUSTER_ALIAS + ".internal");
+
+        produceMessages(primary, "test-topic-1");
+
+        // Check offsets are pushed to the checkpoint topic
+        Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+                "auto.offset.reset", "earliest"), PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal");
+        waitForCondition(() -> {
+            ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(Duration.ofSeconds(1L));
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+                Checkpoint checkpoint = Checkpoint.deserializeRecord(record);
+                if ((PRIMARY_CLUSTER_ALIAS + ".test-topic-1").equals(checkpoint.topicPartition().topic())) {
+                    return true;
+                }
+            }
+            return false;
+        }, 30_000,
+            "Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS + "test-topic-1"
+        );

Review comment:
       Perhaps it's overkill, but maybe we should also assert that offset syncs topic doesn't exist in the source cluster here at the end of the test?

##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -202,6 +204,10 @@
     private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote partition can be before it is resynced.";
     public static final long OFFSET_LAG_MAX_DEFAULT = 100L;
 
+    private static final String OFFSET_SYNCS_TOPIC_LOCATION = "offset-syncs.topic.location";
+    private static final Object OFFSET_SYNCS_TOPIC_LOCATION_DEFAULT = SOURCE_CLUSTER_ALIAS_DEFAULT;

Review comment:
       `String` not `Object`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] edoardocomar commented on a change in pull request #10221: KAFKA-12379: Allow configuring the location of the offset-syncs topic…

Posted by GitBox <gi...@apache.org>.
edoardocomar commented on a change in pull request #10221:
URL: https://github.com/apache/kafka/pull/10221#discussion_r658094350



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##########
@@ -220,6 +220,9 @@ public MirrorClientConfig clientConfig(String cluster) {
         props.putIfAbsent(TARGET_CLUSTER_ALIAS, sourceAndTarget.target());
 
         // override with connector-level properties
+        Map<String, String> p = stringsWithPrefixStripped(sourceAndTarget.source() + "->"
+                + sourceAndTarget.target() + ".");
+        System.out.println(p);

Review comment:
       would you find useful to trace the content of the map and remove the Sysout ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #10221: KAFKA-12379: Allow configuring the location of the offset-syncs topic…

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #10221:
URL: https://github.com/apache/kafka/pull/10221#discussion_r658821310



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
##########
@@ -262,4 +264,65 @@ public void testTargetAdminConfigWithSourcePrefix() {
         assertEquals(expectedAdminProps, connectorAdminProps, prefix + " source connector admin props not matching");
     }
 
+    @Test
+    public void testOffsetSyncsTopic() {
+        // Invalid location
+        Map<String, String> connectorProps = makeProps("offset-syncs.topic.location", "something");
+        assertThrows(ConfigException.class, () -> new MirrorConnectorConfig(connectorProps));
+
+        connectorProps.put("offset-syncs.topic.location", "source");
+        MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps);
+        assertEquals("mm2-offset-syncs.target2.internal", config.offsetSyncsTopic());
+        connectorProps.put("offset-syncs.topic.location", "target");
+        config = new MirrorConnectorConfig(connectorProps);
+        assertEquals("mm2-offset-syncs.source1.internal", config.offsetSyncsTopic());
+        // Default to source
+        connectorProps.remove("offset-syncs.topic.location");
+        config = new MirrorConnectorConfig(connectorProps);
+        assertEquals("mm2-offset-syncs.target2.internal", config.offsetSyncsTopic());
+    }
+
+    @Test
+    public void testConsumerConfigsForOffsetSyncsTopic() {
+        Map<String, String> connectorProps = makeProps(
+                "source.consumer.max.partition.fetch.bytes", "1",
+                "target.consumer.heartbeat.interval.ms", "1",
+                "consumer.max.poll.interval.ms", "1",
+                "fetch.min.bytes", "1"
+        );
+        MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps);
+        assertEquals(config.sourceConsumerConfig(), config.offsetSyncsTopicConsumerConfig());
+        connectorProps.put("offset-syncs.topic.location", "target");
+        config = new MirrorConnectorConfig(connectorProps);
+        assertEquals(config.targetConsumerConfig(), config.offsetSyncsTopicConsumerConfig());
+    }
+
+    @Test
+    public void testProducerConfigsForOffsetSyncsTopic() {
+        Map<String, String> connectorProps = makeProps(
+                "source.batch.size", "1",
+                "target.acks", "1",
+                "producer.max.poll.interval.ms", "1",
+                "fetch.min.bytes", "1"
+        );
+        MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps);
+        assertEquals(config.sourceProducerConfig(), config.offsetSyncsTopicProducerConfig());
+        connectorProps.put("offset-syncs.topic.location", "target");

Review comment:
       Right, I should have seen that this morning when I updated the test just above! Thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] edoardocomar commented on a change in pull request #10221: KAFKA-12379: Allow configuring the location of the offset-syncs topic…

Posted by GitBox <gi...@apache.org>.
edoardocomar commented on a change in pull request #10221:
URL: https://github.com/apache/kafka/pull/10221#discussion_r658091005



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -202,6 +205,10 @@
     private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote partition can be before it is resynced.";
     public static final long OFFSET_LAG_MAX_DEFAULT = 100L;
 
+    private static final String OFFSET_SYNCS_TOPIC_LOCATION = "offset-syncs.topic.location";
+    private static final Object OFFSET_SYNCS_TOPIC_LOCATION_DEFAULT = SOURCE_CLUSTER_ALIAS_DEFAULT;
+    private static final String OFFSET_SYNCS_TOPIC_LOCATION_DOC = "The location of the offset-syncs topic.";
+

Review comment:
       coud be a bit more verbose ? 
   "The location (source/target) of the offset-syncs topic."




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] edoardocomar commented on a change in pull request #10221: KAFKA-12379: Allow configuring the location of the offset-syncs topic…

Posted by GitBox <gi...@apache.org>.
edoardocomar commented on a change in pull request #10221:
URL: https://github.com/apache/kafka/pull/10221#discussion_r658097481



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
##########
@@ -234,6 +234,6 @@ public void testRefreshTopicPartitionsTopicOnTargetFirst() throws Exception {
         // when partitions are added to the source cluster, reconfiguration is triggered
         connector.refreshTopicPartitions();
         verify(connector, times(1)).computeAndCreateTopicPartitions();
-
     }
+

Review comment:
       can remove this line




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org