You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2021/11/15 10:47:36 UTC
[kafka] branch trunk updated: KAFKA-13255: Use config.properties.exclude when mirroring topics (#11401)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8318786 KAFKA-13255: Use config.properties.exclude when mirroring topics (#11401)
8318786 is described below
commit 8318786b72ccc1e74538aa99f3c5b1c21258bbfb
Author: Ed B <bd...@gmail.com>
AuthorDate: Mon Nov 15 05:45:48 2021 -0500
KAFKA-13255: Use config.properties.exclude when mirroring topics (#11401)
Reviewers: Mickael Maison <mi...@gmail.com>
---
.../connect/mirror/MirrorSourceConnector.java | 5 ++-
.../connect/mirror/MirrorSourceConnectorTest.java | 46 +++++++++++++++++++++
.../IdentityReplicationIntegrationTest.java | 9 +++++
.../MirrorConnectorsIntegrationBaseTest.java | 47 +++++++++++++++++++++-
4 files changed, 104 insertions(+), 3 deletions(-)
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
index b2cbe02..ee67330 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -349,14 +349,15 @@ public class MirrorSourceConnector extends SourceConnector {
}
}
- private void createNewTopics(Set<String> newSourceTopics, Map<String, Long> sourceTopicToPartitionCounts)
+ // visible for testing
+ void createNewTopics(Set<String> newSourceTopics, Map<String, Long> sourceTopicToPartitionCounts)
throws ExecutionException, InterruptedException {
Map<String, Config> sourceTopicToConfig = describeTopicConfigs(newSourceTopics);
Map<String, NewTopic> newTopics = newSourceTopics.stream()
.map(sourceTopic -> {
String remoteTopic = formatRemoteTopic(sourceTopic);
int partitionCount = sourceTopicToPartitionCounts.get(sourceTopic).intValue();
- Map<String, String> configs = configToMap(sourceTopicToConfig.get(sourceTopic));
+ Map<String, String> configs = configToMap(targetConfig(sourceTopicToConfig.get(sourceTopic)));
return new NewTopic(remoteTopic, partitionCount, (short) replicationFactor)
.configs(configs);
})
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index b4c8ca6..7daa96b 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -36,7 +36,10 @@ import static org.apache.kafka.connect.mirror.TestUtils.makeProps;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
@@ -153,6 +156,49 @@ public class MirrorSourceConnectorTest {
}
@Test
+ public void testNewTopicConfigs() throws Exception {
+ Map<String, Object> filterConfig = new HashMap<>();
+ filterConfig.put(DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_CONFIG, "follower\\.replication\\.throttled\\.replicas, "
+ + "leader\\.replication\\.throttled\\.replicas, "
+ + "message\\.timestamp\\.difference\\.max\\.ms, "
+ + "message\\.timestamp\\.type, "
+ + "unclean\\.leader\\.election\\.enable, "
+ + "min\\.insync\\.replicas,"
+ + "exclude_param.*");
+ DefaultConfigPropertyFilter filter = new DefaultConfigPropertyFilter();
+ filter.configure(filterConfig);
+
+ MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+ new DefaultReplicationPolicy(), x -> true, filter));
+
+ final String topic = "testtopic";
+ List<ConfigEntry> entries = new ArrayList<>();
+ entries.add(new ConfigEntry("name-1", "value-1"));
+ entries.add(new ConfigEntry("exclude_param.param1", "value-param1"));
+ entries.add(new ConfigEntry("min.insync.replicas", "2"));
+ Config config = new Config(entries);
+ doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any());
+ doAnswer(invocation -> {
+ Map<String, NewTopic> newTopics = invocation.getArgument(0);
+ assertNotNull(newTopics.get("source." + topic));
+ Map<String, String> targetConfig = newTopics.get("source." + topic).configs();
+
+ // property 'name-1' isn't defined in the exclude filter -> should be replicated
+ assertNotNull(targetConfig.get("name-1"), "should replicate properties");
+
+ // this property is in default list, just double check it:
+ String prop1 = "min.insync.replicas";
+ assertNull(targetConfig.get(prop1), "should not replicate excluded properties " + prop1);
+ // this property is only in exclude filter custom parameter, also tests regex on the way:
+ String prop2 = "exclude_param.param1";
+ assertNull(targetConfig.get(prop2), "should not replicate excluded properties " + prop2);
+ return null;
+ }).when(connector).createNewTopics(any());
+ connector.createNewTopics(Collections.singleton(topic), Collections.singletonMap(topic, 1L));
+ verify(connector).createNewTopics(any(), any());
+ }
+
+ @Test
public void testMirrorSourceConnectorTaskConfig() {
List<TopicPartition> knownSourceTopicPartitions = new ArrayList<>();
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
index 43b1fcb..9e60e48 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
@@ -86,6 +86,7 @@ public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrat
waitForTopicCreated(backup, "test-topic-1");
assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, getTopicConfig(backup.kafka(), "test-topic-1", TopicConfig.CLEANUP_POLICY_CONFIG),
"topic config was not synced");
+ createAndTestNewTopicWithConfigFilter();
assertEquals(NUM_RECORDS_PRODUCED, primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(),
"Records were not produced to primary cluster.");
@@ -260,4 +261,12 @@ public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrat
assertEquals(0, records.count(), "consumer record size is not zero");
backupConsumer.close();
}
+
+ /*
+ * Returns expected topic name on target cluster.
+ */
+ @Override
+ String backupClusterTopicName(String topic) {
+ return topic;
+ }
}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 6fb7a81..8f692ca 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter;
import org.apache.kafka.connect.mirror.MirrorClient;
import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
import org.apache.kafka.connect.mirror.MirrorMakerConfig;
@@ -60,6 +61,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -155,6 +157,9 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
mm2Props.putAll(basicMM2Config());
mm2Props.putAll(additionalMM2Config);
+ // exclude topic config:
+ mm2Props.put(DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_CONFIG, "delete\\.retention\\..*");
+
mm2Config = new MirrorMakerConfig(mm2Props);
primaryWorkerProps = mm2Config.workerConfig(new SourceAndTarget(BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS));
backupWorkerProps.putAll(mm2Config.workerConfig(new SourceAndTarget(PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS)));
@@ -259,7 +264,8 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
waitForTopicCreated(primary, "mm2-offset-syncs.backup.internal");
assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, getTopicConfig(backup.kafka(), "primary.test-topic-1", TopicConfig.CLEANUP_POLICY_CONFIG),
"topic config was not synced");
-
+ createAndTestNewTopicWithConfigFilter();
+
assertEquals(NUM_RECORDS_PRODUCED, primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(),
"Records were not produced to primary cluster.");
assertEquals(NUM_RECORDS_PRODUCED, backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "primary.test-topic-1").count(),
@@ -429,6 +435,7 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
// one way replication from primary to backup
mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false");
+
mm2Config = new MirrorMakerConfig(mm2Props);
waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
@@ -520,6 +527,44 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
}
/*
+ * Run tests for Exclude Filter for copying topic configurations
+ */
+ void createAndTestNewTopicWithConfigFilter() throws Exception {
+ // create topic with configuration to test:
+ final Map<String, String> topicConfig = new HashMap<>();
+ topicConfig.put("delete.retention.ms", "1000"); // should be excluded (default value is 86400000)
+ topicConfig.put("retention.bytes", "1000"); // should be included, default value is -1
+
+ final String topic = "test-topic-with-config";
+ final String backupTopic = backupClusterTopicName(topic);
+
+ primary.kafka().createTopic(topic, NUM_PARTITIONS, 1, topicConfig);
+ waitForTopicCreated(backup, backupTopic);
+
+ String primaryConfig, backupConfig;
+
+ primaryConfig = getTopicConfig(primary.kafka(), topic, "delete.retention.ms");
+ backupConfig = getTopicConfig(backup.kafka(), backupTopic, "delete.retention.ms");
+ assertNotEquals(primaryConfig, backupConfig,
+ "`delete.retention.ms` should be different, because it's in exclude filter! ");
+
+ // regression test for the config that are still supposed to be replicated
+ primaryConfig = getTopicConfig(primary.kafka(), topic, "retention.bytes");
+ backupConfig = getTopicConfig(backup.kafka(), backupTopic, "retention.bytes");
+ assertEquals(primaryConfig, backupConfig,
+ "`retention.bytes` should be the same, because it isn't in exclude filter! ");
+ assertEquals("1000", backupConfig,
+ "`retention.bytes` should be the same, because it's explicitly defined! ");
+ }
+
+ /*
+ * Returns expected topic name on target cluster.
+ */
+ String backupClusterTopicName(String topic) {
+ return PRIMARY_CLUSTER_ALIAS + "." + topic;
+ }
+
+ /*
* launch the connectors on kafka connect cluster and check if they are running
*/
protected static void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connectCluster,