You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2021/11/15 10:47:36 UTC

[kafka] branch trunk updated: KAFKA-13255: Use config.properties.exclude when mirroring topics (#11401)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8318786  KAFKA-13255: Use config.properties.exclude when mirroring topics (#11401)
8318786 is described below

commit 8318786b72ccc1e74538aa99f3c5b1c21258bbfb
Author: Ed B <bd...@gmail.com>
AuthorDate: Mon Nov 15 05:45:48 2021 -0500

    KAFKA-13255: Use config.properties.exclude when mirroring topics (#11401)
    
    
    Reviewers: Mickael Maison <mi...@gmail.com>
---
 .../connect/mirror/MirrorSourceConnector.java      |  5 ++-
 .../connect/mirror/MirrorSourceConnectorTest.java  | 46 +++++++++++++++++++++
 .../IdentityReplicationIntegrationTest.java        |  9 +++++
 .../MirrorConnectorsIntegrationBaseTest.java       | 47 +++++++++++++++++++++-
 4 files changed, 104 insertions(+), 3 deletions(-)

diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
index b2cbe02..ee67330 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -349,14 +349,15 @@ public class MirrorSourceConnector extends SourceConnector {
         }
     }
 
-    private void createNewTopics(Set<String> newSourceTopics, Map<String, Long> sourceTopicToPartitionCounts)
+    // visible for testing
+    void createNewTopics(Set<String> newSourceTopics, Map<String, Long> sourceTopicToPartitionCounts)
             throws ExecutionException, InterruptedException {
         Map<String, Config> sourceTopicToConfig = describeTopicConfigs(newSourceTopics);
         Map<String, NewTopic> newTopics = newSourceTopics.stream()
                 .map(sourceTopic -> {
                     String remoteTopic = formatRemoteTopic(sourceTopic);
                     int partitionCount = sourceTopicToPartitionCounts.get(sourceTopic).intValue();
-                    Map<String, String> configs = configToMap(sourceTopicToConfig.get(sourceTopic));
+                    Map<String, String> configs = configToMap(targetConfig(sourceTopicToConfig.get(sourceTopic)));
                     return new NewTopic(remoteTopic, partitionCount, (short) replicationFactor)
                             .configs(configs);
                 })
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index b4c8ca6..7daa96b 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -36,7 +36,10 @@ import static org.apache.kafka.connect.mirror.TestUtils.makeProps;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.eq;
@@ -153,6 +156,49 @@ public class MirrorSourceConnectorTest {
     }
 
     @Test
+    public void testNewTopicConfigs() throws Exception {
+        Map<String, Object> filterConfig = new HashMap<>();
+        filterConfig.put(DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_CONFIG, "follower\\.replication\\.throttled\\.replicas, "
+                + "leader\\.replication\\.throttled\\.replicas, "
+                + "message\\.timestamp\\.difference\\.max\\.ms, "
+                + "message\\.timestamp\\.type, "
+                + "unclean\\.leader\\.election\\.enable, "
+                + "min\\.insync\\.replicas,"
+                + "exclude_param.*");
+        DefaultConfigPropertyFilter filter = new DefaultConfigPropertyFilter();
+        filter.configure(filterConfig);
+
+        MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(), x -> true, filter));
+
+        final String topic = "testtopic";
+        List<ConfigEntry> entries = new ArrayList<>();
+        entries.add(new ConfigEntry("name-1", "value-1"));
+        entries.add(new ConfigEntry("exclude_param.param1", "value-param1"));
+        entries.add(new ConfigEntry("min.insync.replicas", "2"));
+        Config config = new Config(entries);
+        doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any());
+        doAnswer(invocation -> {
+            Map<String, NewTopic> newTopics = invocation.getArgument(0);
+            assertNotNull(newTopics.get("source." + topic));
+            Map<String, String> targetConfig = newTopics.get("source." + topic).configs();
+
+            // property 'name-1' isn't defined in the exclude filter -> should be replicated
+            assertNotNull(targetConfig.get("name-1"), "should replicate properties");
+
+            // this property is in default list, just double check it:
+            String prop1 = "min.insync.replicas";
+            assertNull(targetConfig.get(prop1), "should not replicate excluded properties " + prop1);
+            // this property is only in exclude filter custom parameter, also tests regex on the way:
+            String prop2 = "exclude_param.param1";
+            assertNull(targetConfig.get(prop2), "should not replicate excluded properties " + prop2);
+            return null;
+        }).when(connector).createNewTopics(any());
+        connector.createNewTopics(Collections.singleton(topic), Collections.singletonMap(topic, 1L));
+        verify(connector).createNewTopics(any(), any());
+    }
+
+    @Test
     public void testMirrorSourceConnectorTaskConfig() {
         List<TopicPartition> knownSourceTopicPartitions = new ArrayList<>();
 
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
index 43b1fcb..9e60e48 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
@@ -86,6 +86,7 @@ public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrat
         waitForTopicCreated(backup, "test-topic-1");
         assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, getTopicConfig(backup.kafka(), "test-topic-1", TopicConfig.CLEANUP_POLICY_CONFIG),
                 "topic config was not synced");
+        createAndTestNewTopicWithConfigFilter();
 
         assertEquals(NUM_RECORDS_PRODUCED, primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(),
                 "Records were not produced to primary cluster.");
@@ -260,4 +261,12 @@ public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrat
         assertEquals(0, records.count(), "consumer record size is not zero");
         backupConsumer.close();
     }
+
+    /*
+     * Returns expected topic name on target cluster.
+     */
+    @Override
+    String backupClusterTopicName(String topic) {
+        return topic;
+    }
 }
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 6fb7a81..8f692ca 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter;
 import org.apache.kafka.connect.mirror.MirrorClient;
 import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
 import org.apache.kafka.connect.mirror.MirrorMakerConfig;
@@ -60,6 +61,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -155,6 +157,9 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
         mm2Props.putAll(basicMM2Config());
         mm2Props.putAll(additionalMM2Config);
 
+        // exclude topic config:
+        mm2Props.put(DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_CONFIG, "delete\\.retention\\..*");
+
         mm2Config = new MirrorMakerConfig(mm2Props);
         primaryWorkerProps = mm2Config.workerConfig(new SourceAndTarget(BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS));
         backupWorkerProps.putAll(mm2Config.workerConfig(new SourceAndTarget(PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS)));
@@ -259,7 +264,8 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
         waitForTopicCreated(primary, "mm2-offset-syncs.backup.internal");
         assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, getTopicConfig(backup.kafka(), "primary.test-topic-1", TopicConfig.CLEANUP_POLICY_CONFIG),
                 "topic config was not synced");
-        
+        createAndTestNewTopicWithConfigFilter();
+
         assertEquals(NUM_RECORDS_PRODUCED, primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(),
             "Records were not produced to primary cluster.");
         assertEquals(NUM_RECORDS_PRODUCED, backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "primary.test-topic-1").count(),
@@ -429,6 +435,7 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
         // one way replication from primary to backup
         mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false");
 
+
         mm2Config = new MirrorMakerConfig(mm2Props);
 
         waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
@@ -520,6 +527,44 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
     }
 
     /*
+     * Run tests for Exclude Filter for copying topic configurations
+     */
+    void createAndTestNewTopicWithConfigFilter() throws Exception {
+        // create topic with configuration to test:
+        final Map<String, String> topicConfig = new HashMap<>();
+        topicConfig.put("delete.retention.ms", "1000"); // should be excluded (default value is 86400000)
+        topicConfig.put("retention.bytes", "1000"); // should be included, default value is -1
+
+        final String topic = "test-topic-with-config";
+        final String backupTopic = backupClusterTopicName(topic);
+
+        primary.kafka().createTopic(topic, NUM_PARTITIONS, 1, topicConfig);
+        waitForTopicCreated(backup, backupTopic);
+
+        String primaryConfig, backupConfig;
+
+        primaryConfig = getTopicConfig(primary.kafka(), topic, "delete.retention.ms");
+        backupConfig = getTopicConfig(backup.kafka(), backupTopic, "delete.retention.ms");
+        assertNotEquals(primaryConfig, backupConfig,
+                "`delete.retention.ms` should be different, because it's in exclude filter! ");
+
+        // regression test for the config that are still supposed to be replicated
+        primaryConfig = getTopicConfig(primary.kafka(), topic, "retention.bytes");
+        backupConfig = getTopicConfig(backup.kafka(), backupTopic, "retention.bytes");
+        assertEquals(primaryConfig, backupConfig,
+                "`retention.bytes` should be the same, because it isn't in exclude filter! ");
+        assertEquals("1000", backupConfig,
+                "`retention.bytes` should be the same, because it's explicitly defined! ");
+    }
+
+    /*
+     * Returns expected topic name on target cluster.
+     */
+    String backupClusterTopicName(String topic) {
+        return PRIMARY_CLUSTER_ALIAS + "." + topic;
+    }
+
+    /*
      * launch the connectors on kafka connect cluster and check if they are running
      */
     protected static void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connectCluster,