You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/10/13 23:30:32 UTC
[kafka] branch 2.7 updated: KAFKA-10573 Update connect transforms
configs for KIP-629 (#9403)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new f3658c5 KAFKA-10573 Update connect transforms configs for KIP-629 (#9403)
f3658c5 is described below
commit f3658c58d0faf58f169104f92d4bbd9c8059ee56
Author: Xavier Léauté <xv...@apache.org>
AuthorDate: Tue Oct 13 16:13:44 2020 -0700
KAFKA-10573 Update connect transforms configs for KIP-629 (#9403)
Changes the Connect `ReplaceField` SMT's configuration properties, deprecating and replacing `blacklist` with `exclude`, and `whitelist` with `include`. The old configurations are still allowed (ensuring backward compatibility), but warning messages are written to the log to suggest users change to `include` and `exclude`.
This is part of KIP-629.
Author: Xavier Léauté <xv...@apache.org>
Reviewer: Randall Hauch <rh...@gmail.com>
---
.../kafka/connect/transforms/ReplaceField.java | 37 ++++++++++++-----
.../kafka/connect/transforms/ReplaceFieldTest.java | 47 ++++++++++++++++++++--
2 files changed, 69 insertions(+), 15 deletions(-)
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
index 3d9abc2..fb02577 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
@@ -20,7 +20,9 @@ import org.apache.kafka.common.cache.Cache;
import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.ConfigUtils;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
@@ -44,16 +46,25 @@ public abstract class ReplaceField<R extends ConnectRecord<R>> implements Transf
+ "or value (<code>" + Value.class.getName() + "</code>).";
interface ConfigName {
- String BLACKLIST = "blacklist";
- String WHITELIST = "whitelist";
+ String EXCLUDE = "exclude";
+ String INCLUDE = "include";
+
+ // for backwards compatibility
+ String INCLUDE_ALIAS = "whitelist";
+ String EXCLUDE_ALIAS = "blacklist";
+
String RENAME = "renames";
}
public static final ConfigDef CONFIG_DEF = new ConfigDef()
- .define(ConfigName.BLACKLIST, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM,
- "Fields to exclude. This takes precedence over the whitelist.")
- .define(ConfigName.WHITELIST, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM,
+ .define(ConfigName.EXCLUDE, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM,
+ "Fields to exclude. This takes precedence over the fields to include.")
+ .define("blacklist", ConfigDef.Type.LIST, null, Importance.LOW,
+ "Deprecated. Use " + ConfigName.EXCLUDE + " instead.")
+ .define(ConfigName.INCLUDE, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM,
"Fields to include. If specified, only these fields will be used.")
+ .define("whitelist", ConfigDef.Type.LIST, null, Importance.LOW,
+ "Deprecated. Use " + ConfigName.INCLUDE + " instead.")
.define(ConfigName.RENAME, ConfigDef.Type.LIST, Collections.emptyList(), new ConfigDef.Validator() {
@SuppressWarnings("unchecked")
@Override
@@ -69,8 +80,8 @@ public abstract class ReplaceField<R extends ConnectRecord<R>> implements Transf
private static final String PURPOSE = "field replacement";
- private List<String> blacklist;
- private List<String> whitelist;
+ private List<String> exclude;
+ private List<String> include;
private Map<String, String> renames;
private Map<String, String> reverseRenames;
@@ -78,9 +89,13 @@ public abstract class ReplaceField<R extends ConnectRecord<R>> implements Transf
@Override
public void configure(Map<String, ?> configs) {
- final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);
- blacklist = config.getList(ConfigName.BLACKLIST);
- whitelist = config.getList(ConfigName.WHITELIST);
+ final SimpleConfig config = new SimpleConfig(CONFIG_DEF, ConfigUtils.translateDeprecatedConfigs(configs, new String[][]{
+ {ConfigName.INCLUDE, "whitelist"},
+ {ConfigName.EXCLUDE, "blacklist"},
+ }));
+
+ exclude = config.getList(ConfigName.EXCLUDE);
+ include = config.getList(ConfigName.INCLUDE);
renames = parseRenameMappings(config.getList(ConfigName.RENAME));
reverseRenames = invert(renames);
@@ -108,7 +123,7 @@ public abstract class ReplaceField<R extends ConnectRecord<R>> implements Transf
}
boolean filter(String fieldName) {
- return !blacklist.contains(fieldName) && (whitelist.isEmpty() || whitelist.contains(fieldName));
+ return !exclude.contains(fieldName) && (include.isEmpty() || include.contains(fieldName));
}
String renamed(String fieldName) {
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
index 9b43186..c8aac95 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
@@ -40,7 +40,7 @@ public class ReplaceFieldTest {
@Test
public void tombstoneSchemaless() {
final Map<String, String> props = new HashMap<>();
- props.put("whitelist", "abc,foo");
+ props.put("include", "abc,foo");
props.put("renames", "abc:xyz,foo:bar");
xform.configure(props);
@@ -55,7 +55,7 @@ public class ReplaceFieldTest {
@Test
public void tombstoneWithSchema() {
final Map<String, String> props = new HashMap<>();
- props.put("whitelist", "abc,foo");
+ props.put("include", "abc,foo");
props.put("renames", "abc:xyz,foo:bar");
xform.configure(props);
@@ -77,7 +77,7 @@ public class ReplaceFieldTest {
@Test
public void schemaless() {
final Map<String, String> props = new HashMap<>();
- props.put("blacklist", "dont");
+ props.put("exclude", "dont");
props.put("renames", "abc:xyz,foo:bar");
xform.configure(props);
@@ -101,7 +101,7 @@ public class ReplaceFieldTest {
@Test
public void withSchema() {
final Map<String, String> props = new HashMap<>();
- props.put("whitelist", "abc,foo");
+ props.put("include", "abc,foo");
props.put("renames", "abc:xyz,foo:bar");
xform.configure(props);
@@ -129,4 +129,43 @@ public class ReplaceFieldTest {
assertEquals(true, updatedValue.getBoolean("bar"));
}
+ @Test
+ public void testIncludeBackwardsCompatibility() {
+ final Map<String, String> props = new HashMap<>();
+ props.put("whitelist", "abc,foo");
+ props.put("renames", "abc:xyz,foo:bar");
+
+ xform.configure(props);
+
+ final SinkRecord record = new SinkRecord("test", 0, null, null, null, null, 0);
+ final SinkRecord transformedRecord = xform.apply(record);
+
+ assertNull(transformedRecord.value());
+ assertNull(transformedRecord.valueSchema());
+ }
+
+
+ @Test
+ public void testExcludeBackwardsCompatibility() {
+ final Map<String, String> props = new HashMap<>();
+ props.put("blacklist", "dont");
+ props.put("renames", "abc:xyz,foo:bar");
+
+ xform.configure(props);
+
+ final Map<String, Object> value = new HashMap<>();
+ value.put("dont", "whatever");
+ value.put("abc", 42);
+ value.put("foo", true);
+ value.put("etc", "etc");
+
+ final SinkRecord record = new SinkRecord("test", 0, null, null, null, value, 0);
+ final SinkRecord transformedRecord = xform.apply(record);
+
+ final Map updatedValue = (Map) transformedRecord.value();
+ assertEquals(3, updatedValue.size());
+ assertEquals(42, updatedValue.get("xyz"));
+ assertEquals(true, updatedValue.get("bar"));
+ assertEquals("etc", updatedValue.get("etc"));
+ }
}