You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/10/20 14:39:43 UTC
[kafka] branch 2.7 updated: KAFKA-10572 mirror-maker config changes
for KIP-629 (#9429)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 33c904c KAFKA-10572 mirror-maker config changes for KIP-629 (#9429)
33c904c is described below
commit 33c904cfa19a6bad6d3d291ef91f91c2655ac92a
Author: Xavier Léauté <xv...@apache.org>
AuthorDate: Tue Oct 20 07:15:14 2020 -0700
KAFKA-10572 mirror-maker config changes for KIP-629 (#9429)
Author: Xavier Léauté <xa...@confluent.io>
Reviewer: Randall Hauch <rh...@gmail.com>
---
connect/mirror/README.md | 2 +-
.../mirror/DefaultConfigPropertyFilter.java | 57 +++++++++-------
.../kafka/connect/mirror/DefaultGroupFilter.java | 75 ++++++++++++---------
.../kafka/connect/mirror/DefaultTopicFilter.java | 72 +++++++++++---------
.../connect/mirror/MirrorConnectorConfig.java | 78 ++++++++++++++--------
.../connect/mirror/MirrorConnectorConfigTest.java | 16 ++++-
.../connect/mirror/MirrorMakerConfigTest.java | 63 ++++++++++++++---
.../connect/mirror/MirrorSourceConnectorTest.java | 2 +-
8 files changed, 239 insertions(+), 126 deletions(-)
diff --git a/connect/mirror/README.md b/connect/mirror/README.md
index 770c542..9c7c259 100644
--- a/connect/mirror/README.md
+++ b/connect/mirror/README.md
@@ -29,7 +29,7 @@ specific topics or groups:
A->B.groups = group-1, group-2
By default, all topics and consumer groups are replicated (except
-blacklisted ones), across all enabled replication flows. Each
+excluded ones), across all enabled replication flows. Each
replication flow must be explicitly enabled to begin replication:
A->B.enabled = true
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java
index f51db1c..0c85f50 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java
@@ -20,58 +20,67 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.utils.ConfigUtils;
import java.util.Map;
import java.util.regex.Pattern;
-/** Uses a blacklist of property names or regexes. */
+/** Filters excluded property names or regexes. */
public class DefaultConfigPropertyFilter implements ConfigPropertyFilter {
- public static final String CONFIG_PROPERTIES_BLACKLIST_CONFIG = "config.properties.blacklist";
- private static final String CONFIG_PROPERTIES_BLACKLIST_DOC = "List of topic configuration properties and/or regexes "
- + "that should not be replicated.";
- public static final String CONFIG_PROPERTIES_BLACKLIST_DEFAULT = "follower\\.replication\\.throttled\\.replicas, "
- + "leader\\.replication\\.throttled\\.replicas, "
- + "message\\.timestamp\\.difference\\.max\\.ms, "
- + "message\\.timestamp\\.type, "
- + "unclean\\.leader\\.election\\.enable, "
- + "min\\.insync\\.replicas";
- private Pattern blacklistPattern = MirrorUtils.compilePatternList(CONFIG_PROPERTIES_BLACKLIST_DEFAULT);
+ public static final String CONFIG_PROPERTIES_EXCLUDE_CONFIG = "config.properties.exclude";
+ public static final String CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG = "config.properties.blacklist";
+
+ private static final String CONFIG_PROPERTIES_EXCLUDE_DOC = "List of topic configuration properties and/or regexes "
+ + "that should not be replicated.";
+ public static final String CONFIG_PROPERTIES_EXCLUDE_DEFAULT = "follower\\.replication\\.throttled\\.replicas, "
+ + "leader\\.replication\\.throttled\\.replicas, "
+ + "message\\.timestamp\\.difference\\.max\\.ms, "
+ + "message\\.timestamp\\.type, "
+ + "unclean\\.leader\\.election\\.enable, "
+ + "min\\.insync\\.replicas";
+ private Pattern excludePattern = MirrorUtils.compilePatternList(CONFIG_PROPERTIES_EXCLUDE_DEFAULT);
@Override
public void configure(Map<String, ?> props) {
ConfigPropertyFilterConfig config = new ConfigPropertyFilterConfig(props);
- blacklistPattern = config.blacklistPattern();
+ excludePattern = config.excludePattern();
}
@Override
public void close() {
}
- private boolean blacklisted(String prop) {
- return blacklistPattern != null && blacklistPattern.matcher(prop).matches();
+ private boolean excluded(String prop) {
+ return excludePattern != null && excludePattern.matcher(prop).matches();
}
@Override
public boolean shouldReplicateConfigProperty(String prop) {
- return !blacklisted(prop);
+ return !excluded(prop);
}
static class ConfigPropertyFilterConfig extends AbstractConfig {
static final ConfigDef DEF = new ConfigDef()
- .define(CONFIG_PROPERTIES_BLACKLIST_CONFIG,
- Type.LIST,
- CONFIG_PROPERTIES_BLACKLIST_DEFAULT,
- Importance.HIGH,
- CONFIG_PROPERTIES_BLACKLIST_DOC);
+ .define(CONFIG_PROPERTIES_EXCLUDE_CONFIG,
+ Type.LIST,
+ CONFIG_PROPERTIES_EXCLUDE_DEFAULT,
+ Importance.HIGH,
+ CONFIG_PROPERTIES_EXCLUDE_DOC)
+ .define(CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG,
+ Type.LIST,
+ null,
+ Importance.HIGH,
+ "Deprecated. Use " + CONFIG_PROPERTIES_EXCLUDE_CONFIG + " instead.");
- ConfigPropertyFilterConfig(Map<?, ?> props) {
- super(DEF, props, false);
+ ConfigPropertyFilterConfig(Map<String, ?> props) {
+ super(DEF, ConfigUtils.translateDeprecatedConfigs(props, new String[][]{
+ {CONFIG_PROPERTIES_EXCLUDE_CONFIG, CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG}}), false);
}
- Pattern blacklistPattern() {
- return MirrorUtils.compilePatternList(getList(CONFIG_PROPERTIES_BLACKLIST_CONFIG));
+ Pattern excludePattern() {
+ return MirrorUtils.compilePatternList(getList(CONFIG_PROPERTIES_EXCLUDE_CONFIG));
}
}
}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultGroupFilter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultGroupFilter.java
index acf5115..179067e 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultGroupFilter.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultGroupFilter.java
@@ -20,72 +20,81 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.utils.ConfigUtils;
import java.util.Map;
import java.util.regex.Pattern;
-/** Uses a whitelist and blacklist. */
+/** Uses an include and exclude pattern. */
public class DefaultGroupFilter implements GroupFilter {
- public static final String GROUPS_WHITELIST_CONFIG = "groups";
- private static final String GROUPS_WHITELIST_DOC = "List of consumer group names and/or regexes to replicate.";
- public static final String GROUPS_WHITELIST_DEFAULT = ".*";
+ public static final String GROUPS_INCLUDE_CONFIG = "groups";
+ private static final String GROUPS_INCLUDE_DOC = "List of consumer group names and/or regexes to replicate.";
+ public static final String GROUPS_INCLUDE_DEFAULT = ".*";
- public static final String GROUPS_BLACKLIST_CONFIG = "groups.blacklist";
- private static final String GROUPS_BLACKLIST_DOC = "List of consumer group names and/or regexes that should not be replicated.";
- public static final String GROUPS_BLACKLIST_DEFAULT = "console-consumer-.*, connect-.*, __.*";
+ public static final String GROUPS_EXCLUDE_CONFIG = "groups.exclude";
+ public static final String GROUPS_EXCLUDE_CONFIG_ALIAS = "groups.blacklist";
- private Pattern whitelistPattern;
- private Pattern blacklistPattern;
+ private static final String GROUPS_EXCLUDE_DOC = "List of consumer group names and/or regexes that should not be replicated.";
+ public static final String GROUPS_EXCLUDE_DEFAULT = "console-consumer-.*, connect-.*, __.*";
+
+ private Pattern includePattern;
+ private Pattern excludePattern;
@Override
public void configure(Map<String, ?> props) {
GroupFilterConfig config = new GroupFilterConfig(props);
- whitelistPattern = config.whitelistPattern();
- blacklistPattern = config.blacklistPattern();
+ includePattern = config.includePattern();
+ excludePattern = config.excludePattern();
}
@Override
public void close() {
}
- private boolean whitelisted(String group) {
- return whitelistPattern != null && whitelistPattern.matcher(group).matches();
+ private boolean included(String group) {
+ return includePattern != null && includePattern.matcher(group).matches();
}
- private boolean blacklisted(String group) {
- return blacklistPattern != null && blacklistPattern.matcher(group).matches();
+ private boolean excluded(String group) {
+ return excludePattern != null && excludePattern.matcher(group).matches();
}
@Override
public boolean shouldReplicateGroup(String group) {
- return whitelisted(group) && !blacklisted(group);
+ return included(group) && !excluded(group);
}
static class GroupFilterConfig extends AbstractConfig {
static final ConfigDef DEF = new ConfigDef()
- .define(GROUPS_WHITELIST_CONFIG,
- Type.LIST,
- GROUPS_WHITELIST_DEFAULT,
- Importance.HIGH,
- GROUPS_WHITELIST_DOC)
- .define(GROUPS_BLACKLIST_CONFIG,
- Type.LIST,
- GROUPS_BLACKLIST_DEFAULT,
- Importance.HIGH,
- GROUPS_BLACKLIST_DOC);
-
- GroupFilterConfig(Map<?, ?> props) {
- super(DEF, props, false);
+ .define(GROUPS_INCLUDE_CONFIG,
+ Type.LIST,
+ GROUPS_INCLUDE_DEFAULT,
+ Importance.HIGH,
+ GROUPS_INCLUDE_DOC)
+ .define(GROUPS_EXCLUDE_CONFIG,
+ Type.LIST,
+ GROUPS_EXCLUDE_DEFAULT,
+ Importance.HIGH,
+ GROUPS_EXCLUDE_DOC)
+ .define(GROUPS_EXCLUDE_CONFIG_ALIAS,
+ Type.LIST,
+ null,
+ Importance.HIGH,
+ "Deprecated. Use " + GROUPS_EXCLUDE_CONFIG + " instead.");
+
+ GroupFilterConfig(Map<String, ?> props) {
+ super(DEF, ConfigUtils.translateDeprecatedConfigs(props, new String[][]{
+ {GROUPS_EXCLUDE_CONFIG, GROUPS_EXCLUDE_CONFIG_ALIAS}}), false);
}
- Pattern whitelistPattern() {
- return MirrorUtils.compilePatternList(getList(GROUPS_WHITELIST_CONFIG));
+ Pattern includePattern() {
+ return MirrorUtils.compilePatternList(getList(GROUPS_INCLUDE_CONFIG));
}
- Pattern blacklistPattern() {
- return MirrorUtils.compilePatternList(getList(GROUPS_BLACKLIST_CONFIG));
+ Pattern excludePattern() {
+ return MirrorUtils.compilePatternList(getList(GROUPS_EXCLUDE_CONFIG));
}
}
}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java
index 308bdbf..f808ce8 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java
@@ -20,72 +20,80 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.utils.ConfigUtils;
import java.util.Map;
import java.util.regex.Pattern;
-/** Uses a whitelist and blacklist. */
+/** Uses an include and exclude pattern. */
public class DefaultTopicFilter implements TopicFilter {
- public static final String TOPICS_WHITELIST_CONFIG = "topics";
- private static final String TOPICS_WHITELIST_DOC = "List of topics and/or regexes to replicate.";
- public static final String TOPICS_WHITELIST_DEFAULT = ".*";
+ public static final String TOPICS_INCLUDE_CONFIG = "topics";
+ private static final String TOPICS_INCLUDE_DOC = "List of topics and/or regexes to replicate.";
+ public static final String TOPICS_INCLUDE_DEFAULT = ".*";
- public static final String TOPICS_BLACKLIST_CONFIG = "topics.blacklist";
- private static final String TOPICS_BLACKLIST_DOC = "List of topics and/or regexes that should not be replicated.";
- public static final String TOPICS_BLACKLIST_DEFAULT = ".*[\\-\\.]internal, .*\\.replica, __.*";
+ public static final String TOPICS_EXCLUDE_CONFIG = "topics.exclude";
+ public static final String TOPICS_EXCLUDE_CONFIG_ALIAS = "topics.blacklist";
+ private static final String TOPICS_EXCLUDE_DOC = "List of topics and/or regexes that should not be replicated.";
+ public static final String TOPICS_EXCLUDE_DEFAULT = ".*[\\-\\.]internal, .*\\.replica, __.*";
- private Pattern whitelistPattern;
- private Pattern blacklistPattern;
+ private Pattern includePattern;
+ private Pattern excludePattern;
@Override
public void configure(Map<String, ?> props) {
TopicFilterConfig config = new TopicFilterConfig(props);
- whitelistPattern = config.whitelistPattern();
- blacklistPattern = config.blacklistPattern();
+ includePattern = config.includePattern();
+ excludePattern = config.excludePattern();
}
@Override
public void close() {
}
- private boolean whitelisted(String topic) {
- return whitelistPattern != null && whitelistPattern.matcher(topic).matches();
+ private boolean included(String topic) {
+ return includePattern != null && includePattern.matcher(topic).matches();
}
- private boolean blacklisted(String topic) {
- return blacklistPattern != null && blacklistPattern.matcher(topic).matches();
+ private boolean excluded(String topic) {
+ return excludePattern != null && excludePattern.matcher(topic).matches();
}
@Override
public boolean shouldReplicateTopic(String topic) {
- return whitelisted(topic) && !blacklisted(topic);
+ return included(topic) && !excluded(topic);
}
static class TopicFilterConfig extends AbstractConfig {
static final ConfigDef DEF = new ConfigDef()
- .define(TOPICS_WHITELIST_CONFIG,
- Type.LIST,
- TOPICS_WHITELIST_DEFAULT,
- Importance.HIGH,
- TOPICS_WHITELIST_DOC)
- .define(TOPICS_BLACKLIST_CONFIG,
- Type.LIST,
- TOPICS_BLACKLIST_DEFAULT,
- Importance.HIGH,
- TOPICS_BLACKLIST_DOC);
+ .define(TOPICS_INCLUDE_CONFIG,
+ Type.LIST,
+ TOPICS_INCLUDE_DEFAULT,
+ Importance.HIGH,
+ TOPICS_INCLUDE_DOC)
+ .define(TOPICS_EXCLUDE_CONFIG,
+ Type.LIST,
+ TOPICS_EXCLUDE_DEFAULT,
+ Importance.HIGH,
+ TOPICS_EXCLUDE_DOC)
+ .define(TOPICS_EXCLUDE_CONFIG_ALIAS,
+ Type.LIST,
+ null,
+ Importance.HIGH,
+ "Deprecated. Use " + TOPICS_EXCLUDE_CONFIG + " instead.");
- TopicFilterConfig(Map<?, ?> props) {
- super(DEF, props, false);
+ TopicFilterConfig(Map<String, ?> props) {
+ super(DEF, ConfigUtils.translateDeprecatedConfigs(props, new String[][]{
+ {TOPICS_EXCLUDE_CONFIG, TOPICS_EXCLUDE_CONFIG_ALIAS}}), false);
}
- Pattern whitelistPattern() {
- return MirrorUtils.compilePatternList(getList(TOPICS_WHITELIST_CONFIG));
+ Pattern includePattern() {
+ return MirrorUtils.compilePatternList(getList(TOPICS_INCLUDE_CONFIG));
}
- Pattern blacklistPattern() {
- return MirrorUtils.compilePatternList(getList(TOPICS_BLACKLIST_CONFIG));
+ Pattern excludePattern() {
+ return MirrorUtils.compilePatternList(getList(TOPICS_EXCLUDE_CONFIG));
}
}
}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
index 9b30bc5..0616212 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.utils.ConfigUtils;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
@@ -90,24 +91,28 @@ public class MirrorConnectorConfig extends AbstractConfig {
public static final String REPLICATION_FACTOR = "replication.factor";
private static final String REPLICATION_FACTOR_DOC = "Replication factor for newly created remote topics.";
public static final int REPLICATION_FACTOR_DEFAULT = 2;
- public static final String TOPICS = DefaultTopicFilter.TOPICS_WHITELIST_CONFIG;
- public static final String TOPICS_DEFAULT = DefaultTopicFilter.TOPICS_WHITELIST_DEFAULT;
+ public static final String TOPICS = DefaultTopicFilter.TOPICS_INCLUDE_CONFIG;
+ public static final String TOPICS_DEFAULT = DefaultTopicFilter.TOPICS_INCLUDE_DEFAULT;
private static final String TOPICS_DOC = "Topics to replicate. Supports comma-separated topic names and regexes.";
- public static final String TOPICS_BLACKLIST = DefaultTopicFilter.TOPICS_BLACKLIST_CONFIG;
- public static final String TOPICS_BLACKLIST_DEFAULT = DefaultTopicFilter.TOPICS_BLACKLIST_DEFAULT;
- private static final String TOPICS_BLACKLIST_DOC = "Blacklisted topics. Supports comma-separated topic names and regexes."
- + " Blacklists take precedence over whitelists.";
- public static final String GROUPS = DefaultGroupFilter.GROUPS_WHITELIST_CONFIG;
- public static final String GROUPS_DEFAULT = DefaultGroupFilter.GROUPS_WHITELIST_DEFAULT;
+ public static final String TOPICS_EXCLUDE = DefaultTopicFilter.TOPICS_EXCLUDE_CONFIG;
+ public static final String TOPICS_EXCLUDE_ALIAS = DefaultTopicFilter.TOPICS_EXCLUDE_CONFIG_ALIAS;
+ public static final String TOPICS_EXCLUDE_DEFAULT = DefaultTopicFilter.TOPICS_EXCLUDE_DEFAULT;
+ private static final String TOPICS_EXCLUDE_DOC = "Excluded topics. Supports comma-separated topic names and regexes."
+ + " Excludes take precedence over includes.";
+ public static final String GROUPS = DefaultGroupFilter.GROUPS_INCLUDE_CONFIG;
+ public static final String GROUPS_DEFAULT = DefaultGroupFilter.GROUPS_INCLUDE_DEFAULT;
private static final String GROUPS_DOC = "Consumer groups to replicate. Supports comma-separated group IDs and regexes.";
- public static final String GROUPS_BLACKLIST = DefaultGroupFilter.GROUPS_BLACKLIST_CONFIG;
- public static final String GROUPS_BLACKLIST_DEFAULT = DefaultGroupFilter.GROUPS_BLACKLIST_DEFAULT;
- private static final String GROUPS_BLACKLIST_DOC = "Blacklisted groups. Supports comma-separated group IDs and regexes."
- + " Blacklists take precedence over whitelists.";
- public static final String CONFIG_PROPERTIES_BLACKLIST = DefaultConfigPropertyFilter.CONFIG_PROPERTIES_BLACKLIST_CONFIG;
- public static final String CONFIG_PROPERTIES_BLACKLIST_DEFAULT = DefaultConfigPropertyFilter.CONFIG_PROPERTIES_BLACKLIST_DEFAULT;
- private static final String CONFIG_PROPERTIES_BLACKLIST_DOC = "Topic config properties that should not be replicated. Supports "
- + "comma-separated property names and regexes.";
+ public static final String GROUPS_EXCLUDE = DefaultGroupFilter.GROUPS_EXCLUDE_CONFIG;
+ public static final String GROUPS_EXCLUDE_ALIAS = DefaultGroupFilter.GROUPS_EXCLUDE_CONFIG_ALIAS;
+
+ public static final String GROUPS_EXCLUDE_DEFAULT = DefaultGroupFilter.GROUPS_EXCLUDE_DEFAULT;
+ private static final String GROUPS_EXCLUDE_DOC = "Exclude groups. Supports comma-separated group IDs and regexes."
+ + " Excludes take precedence over includes.";
+ public static final String CONFIG_PROPERTIES_EXCLUDE = DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_CONFIG;
+ public static final String CONFIG_PROPERTIES_EXCLUDE_ALIAS = DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG;
+ public static final String CONFIG_PROPERTIES_EXCLUDE_DEFAULT = DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_DEFAULT;
+ private static final String CONFIG_PROPERTIES_EXCLUDE_DOC = "Topic config properties that should not be replicated. Supports "
+ + "comma-separated property names and regexes.";
public static final String HEARTBEATS_TOPIC_REPLICATION_FACTOR = "heartbeats.topic.replication.factor";
public static final String HEARTBEATS_TOPIC_REPLICATION_FACTOR_DOC = "Replication factor for heartbeats topic.";
@@ -206,7 +211,10 @@ public class MirrorConnectorConfig extends AbstractConfig {
protected static final String TARGET_ADMIN_CLIENT_PREFIX = "target.admin.";
public MirrorConnectorConfig(Map<String, String> props) {
- this(CONNECTOR_CONFIG_DEF, props);
+ this(CONNECTOR_CONFIG_DEF, ConfigUtils.translateDeprecatedConfigs(props, new String[][]{
+ {TOPICS_EXCLUDE, TOPICS_EXCLUDE_ALIAS},
+ {GROUPS_EXCLUDE, GROUPS_EXCLUDE_ALIAS},
+ {CONFIG_PROPERTIES_EXCLUDE, CONFIG_PROPERTIES_EXCLUDE_ALIAS}}));
}
protected MirrorConnectorConfig(ConfigDef configDef, Map<String, String> props) {
@@ -436,11 +444,17 @@ public class MirrorConnectorConfig extends AbstractConfig {
ConfigDef.Importance.HIGH,
TOPICS_DOC)
.define(
- TOPICS_BLACKLIST,
+ TOPICS_EXCLUDE,
ConfigDef.Type.LIST,
- TOPICS_BLACKLIST_DEFAULT,
+ TOPICS_EXCLUDE_DEFAULT,
ConfigDef.Importance.HIGH,
- TOPICS_BLACKLIST_DOC)
+ TOPICS_EXCLUDE_DOC)
+ .define(
+ TOPICS_EXCLUDE_ALIAS,
+ ConfigDef.Type.LIST,
+ null,
+ ConfigDef.Importance.HIGH,
+ "Deprecated. Use " + TOPICS_EXCLUDE + " instead.")
.define(
GROUPS,
ConfigDef.Type.LIST,
@@ -448,17 +462,29 @@ public class MirrorConnectorConfig extends AbstractConfig {
ConfigDef.Importance.HIGH,
GROUPS_DOC)
.define(
- GROUPS_BLACKLIST,
+ GROUPS_EXCLUDE,
+ ConfigDef.Type.LIST,
+ GROUPS_EXCLUDE_DEFAULT,
+ ConfigDef.Importance.HIGH,
+ GROUPS_EXCLUDE_DOC)
+ .define(
+ GROUPS_EXCLUDE_ALIAS,
+ ConfigDef.Type.LIST,
+ null,
+ ConfigDef.Importance.HIGH,
+ "Deprecated. Use " + GROUPS_EXCLUDE + " instead.")
+ .define(
+ CONFIG_PROPERTIES_EXCLUDE,
ConfigDef.Type.LIST,
- GROUPS_BLACKLIST_DEFAULT,
+ CONFIG_PROPERTIES_EXCLUDE_DEFAULT,
ConfigDef.Importance.HIGH,
- GROUPS_BLACKLIST_DOC)
+ CONFIG_PROPERTIES_EXCLUDE_DOC)
.define(
- CONFIG_PROPERTIES_BLACKLIST,
+ CONFIG_PROPERTIES_EXCLUDE_ALIAS,
ConfigDef.Type.LIST,
- CONFIG_PROPERTIES_BLACKLIST_DEFAULT,
+ null,
ConfigDef.Importance.HIGH,
- CONFIG_PROPERTIES_BLACKLIST_DOC)
+ "Deprecated. Use " + CONFIG_PROPERTIES_EXCLUDE + " instead.")
.define(
TOPIC_FILTER_CLASS,
ConfigDef.Type.CLASS,
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
index f7db542..e2eb241 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
@@ -69,12 +69,26 @@ public class MirrorConnectorConfigTest {
@Test
public void testConfigPropertyMatching() {
MirrorConnectorConfig config = new MirrorConnectorConfig(
- makeProps("config.properties.blacklist", "prop2"));
+ makeProps("config.properties.exclude", "prop2"));
assertTrue(config.configPropertyFilter().shouldReplicateConfigProperty("prop1"));
assertFalse(config.configPropertyFilter().shouldReplicateConfigProperty("prop2"));
}
@Test
+ public void testConfigBackwardsCompatibility() {
+ MirrorConnectorConfig config = new MirrorConnectorConfig(
+ makeProps("config.properties.blacklist", "prop1",
+ "groups.blacklist", "group-1",
+ "topics.blacklist", "topic-1"));
+ assertFalse(config.configPropertyFilter().shouldReplicateConfigProperty("prop1"));
+ assertTrue(config.configPropertyFilter().shouldReplicateConfigProperty("prop2"));
+ assertFalse(config.topicFilter().shouldReplicateTopic("topic-1"));
+ assertTrue(config.topicFilter().shouldReplicateTopic("topic-2"));
+ assertFalse(config.groupFilter().shouldReplicateGroup("group-1"));
+ assertTrue(config.groupFilter().shouldReplicateGroup("group-2"));
+ }
+
+ @Test
public void testNoTopics() {
MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("topics", ""));
assertFalse(config.topicFilter().shouldReplicateTopic("topic1"));
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
index b618e37..1cba87f 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
@@ -127,7 +127,7 @@ public class MirrorMakerConfigTest {
"topics", "topic-1",
"groups", "group-2",
"replication.policy.separator", "__",
- "config.properties.blacklist", "property-3",
+ "config.properties.exclude", "property-3",
"metric.reporters", "FakeMetricsReporter",
"topic.filter.class", DefaultTopicFilter.class.getName(),
"xxx", "yyy"));
@@ -137,12 +137,12 @@ public class MirrorMakerConfigTest {
MirrorConnectorConfig connectorConfig = new MirrorConnectorConfig(connectorProps);
assertEquals("Connector properties like tasks.max should be passed through to underlying Connectors.",
100, (int) connectorConfig.getInt("tasks.max"));
- assertEquals("Topics whitelist should be passed through to underlying Connectors.",
+ assertEquals("Topics include should be passed through to underlying Connectors.",
Arrays.asList("topic-1"), connectorConfig.getList("topics"));
- assertEquals("Groups whitelist should be passed through to underlying Connectors.",
+ assertEquals("Groups include should be passed through to underlying Connectors.",
Arrays.asList("group-2"), connectorConfig.getList("groups"));
- assertEquals("Config properties blacklist should be passed through to underlying Connectors.",
- Arrays.asList("property-3"), connectorConfig.getList("config.properties.blacklist"));
+ assertEquals("Config properties exclude should be passed through to underlying Connectors.",
+ Arrays.asList("property-3"), connectorConfig.getList("config.properties.exclude"));
assertEquals("Metrics reporters should be passed through to underlying Connectors.",
Arrays.asList("FakeMetricsReporter"), connectorConfig.getList("metric.reporters"));
assertEquals("Filters should be passed through to underlying Connectors.",
@@ -154,11 +154,58 @@ public class MirrorMakerConfigTest {
}
@Test
+ public void testConfigBackwardsCompatibility() {
+ MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
+ "clusters", "a, b",
+ "groups.blacklist", "group-7",
+ "topics.blacklist", "topic3",
+ "config.properties.blacklist", "property-3",
+ "topic.filter.class", DefaultTopicFilter.class.getName()));
+ SourceAndTarget sourceAndTarget = new SourceAndTarget("source", "target");
+ Map<String, String> connectorProps = mirrorConfig.connectorBaseConfig(sourceAndTarget,
+ MirrorSourceConnector.class);
+ MirrorConnectorConfig connectorConfig = new MirrorConnectorConfig(connectorProps);
+ DefaultTopicFilter.TopicFilterConfig filterConfig =
+ new DefaultTopicFilter.TopicFilterConfig(connectorProps);
+
+ assertEquals("Topics exclude should be backwards compatible.",
+ Arrays.asList("topic3"), filterConfig.getList("topics.exclude"));
+
+ assertEquals("Groups exclude should be backwards compatible.",
+ Arrays.asList("group-7"), connectorConfig.getList("groups.exclude"));
+
+ assertEquals("Config properties exclude should be backwards compatible.",
+ Arrays.asList("property-3"), connectorConfig.getList("config.properties.exclude"));
+
+ }
+
+ @Test
+ public void testConfigBackwardsCompatibilitySourceTarget() {
+ MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
+ "clusters", "a, b",
+ "source->target.topics.blacklist", "topic3",
+ "source->target.groups.blacklist", "group-7",
+ "topic.filter.class", DefaultTopicFilter.class.getName()));
+ SourceAndTarget sourceAndTarget = new SourceAndTarget("source", "target");
+ Map<String, String> connectorProps = mirrorConfig.connectorBaseConfig(sourceAndTarget,
+ MirrorSourceConnector.class);
+ MirrorConnectorConfig connectorConfig = new MirrorConnectorConfig(connectorProps);
+ DefaultTopicFilter.TopicFilterConfig filterConfig =
+ new DefaultTopicFilter.TopicFilterConfig(connectorProps);
+
+ assertEquals("Topics exclude should be backwards compatible.",
+ Arrays.asList("topic3"), filterConfig.getList("topics.exclude"));
+
+ assertEquals("Groups exclude should be backwards compatible.",
+ Arrays.asList("group-7"), connectorConfig.getList("groups.exclude"));
+ }
+
+ @Test
public void testIncludesTopicFilterProperties() {
MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
"clusters", "a, b",
"source->target.topics", "topic1, topic2",
- "source->target.topics.blacklist", "topic3"));
+ "source->target.topics.exclude", "topic3"));
SourceAndTarget sourceAndTarget = new SourceAndTarget("source", "target");
Map<String, String> connectorProps = mirrorConfig.connectorBaseConfig(sourceAndTarget,
MirrorSourceConnector.class);
@@ -166,8 +213,8 @@ public class MirrorMakerConfigTest {
new DefaultTopicFilter.TopicFilterConfig(connectorProps);
assertEquals("source->target.topics should be passed through to TopicFilters.",
Arrays.asList("topic1", "topic2"), filterConfig.getList("topics"));
- assertEquals("source->target.topics.blacklist should be passed through to TopicFilters.",
- Arrays.asList("topic3"), filterConfig.getList("topics.blacklist"));
+ assertEquals("source->target.topics.exclude should be passed through to TopicFilters.",
+ Arrays.asList("topic3"), filterConfig.getList("topics.exclude"));
}
@Test
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index c915845..258dc19 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -129,7 +129,7 @@ public class MirrorSourceConnectorTest {
Config targetConfig = connector.targetConfig(config);
assertTrue("should replicate properties", targetConfig.entries().stream()
.anyMatch(x -> x.name().equals("name-1")));
- assertFalse("should not replicate blacklisted properties", targetConfig.entries().stream()
+ assertFalse("should not replicate excluded properties", targetConfig.entries().stream()
.anyMatch(x -> x.name().equals("min.insync.replicas")));
}