You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/10/13 23:30:32 UTC

[kafka] branch 2.7 updated: KAFKA-10573 Update connect transforms configs for KIP-629 (#9403)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new f3658c5  KAFKA-10573 Update connect transforms configs for KIP-629 (#9403)
f3658c5 is described below

commit f3658c58d0faf58f169104f92d4bbd9c8059ee56
Author: Xavier Léauté <xv...@apache.org>
AuthorDate: Tue Oct 13 16:13:44 2020 -0700

    KAFKA-10573 Update connect transforms configs for KIP-629 (#9403)
    
    Changes the Connect `ReplaceField` SMT's configuration properties, deprecating and replacing `blacklist` with `exclude`, and `whitelist` with `include`. The old configurations are still allowed (ensuring backward compatibility), but warning messages are written to the log to suggest users change to `include` and `exclude`.
    
    This is part of KIP-629.
    
    Author: Xavier Léauté <xv...@apache.org>
    Reviewer: Randall Hauch <rh...@gmail.com>
---
 .../kafka/connect/transforms/ReplaceField.java     | 37 ++++++++++++-----
 .../kafka/connect/transforms/ReplaceFieldTest.java | 47 ++++++++++++++++++++--
 2 files changed, 69 insertions(+), 15 deletions(-)

diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
index 3d9abc2..fb02577 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
@@ -20,7 +20,9 @@ import org.apache.kafka.common.cache.Cache;
 import org.apache.kafka.common.cache.LRUCache;
 import org.apache.kafka.common.cache.SynchronizedCache;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.ConfigUtils;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
@@ -44,16 +46,25 @@ public abstract class ReplaceField<R extends ConnectRecord<R>> implements Transf
             + "or value (<code>" + Value.class.getName() + "</code>).";
 
     interface ConfigName {
-        String BLACKLIST = "blacklist";
-        String WHITELIST = "whitelist";
+        String EXCLUDE = "exclude";
+        String INCLUDE = "include";
+
+        // for backwards compatibility
+        String INCLUDE_ALIAS = "whitelist";
+        String EXCLUDE_ALIAS = "blacklist";
+
         String RENAME = "renames";
     }
 
     public static final ConfigDef CONFIG_DEF = new ConfigDef()
-            .define(ConfigName.BLACKLIST, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM,
-                    "Fields to exclude. This takes precedence over the whitelist.")
-            .define(ConfigName.WHITELIST, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM,
+            .define(ConfigName.EXCLUDE, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM,
+                    "Fields to exclude. This takes precedence over the fields to include.")
+            .define("blacklist", ConfigDef.Type.LIST, null, Importance.LOW,
+                    "Deprecated. Use " + ConfigName.EXCLUDE + " instead.")
+            .define(ConfigName.INCLUDE, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM,
                     "Fields to include. If specified, only these fields will be used.")
+            .define("whitelist", ConfigDef.Type.LIST, null, Importance.LOW,
+                    "Deprecated. Use " + ConfigName.INCLUDE + " instead.")
             .define(ConfigName.RENAME, ConfigDef.Type.LIST, Collections.emptyList(), new ConfigDef.Validator() {
                 @SuppressWarnings("unchecked")
                 @Override
@@ -69,8 +80,8 @@ public abstract class ReplaceField<R extends ConnectRecord<R>> implements Transf
 
     private static final String PURPOSE = "field replacement";
 
-    private List<String> blacklist;
-    private List<String> whitelist;
+    private List<String> exclude;
+    private List<String> include;
     private Map<String, String> renames;
     private Map<String, String> reverseRenames;
 
@@ -78,9 +89,13 @@ public abstract class ReplaceField<R extends ConnectRecord<R>> implements Transf
 
     @Override
     public void configure(Map<String, ?> configs) {
-        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);
-        blacklist = config.getList(ConfigName.BLACKLIST);
-        whitelist = config.getList(ConfigName.WHITELIST);
+        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, ConfigUtils.translateDeprecatedConfigs(configs, new String[][]{
+            {ConfigName.INCLUDE, "whitelist"},
+            {ConfigName.EXCLUDE, "blacklist"},
+        }));
+
+        exclude = config.getList(ConfigName.EXCLUDE);
+        include = config.getList(ConfigName.INCLUDE);
         renames = parseRenameMappings(config.getList(ConfigName.RENAME));
         reverseRenames = invert(renames);
 
@@ -108,7 +123,7 @@ public abstract class ReplaceField<R extends ConnectRecord<R>> implements Transf
     }
 
     boolean filter(String fieldName) {
-        return !blacklist.contains(fieldName) && (whitelist.isEmpty() || whitelist.contains(fieldName));
+        return !exclude.contains(fieldName) && (include.isEmpty() || include.contains(fieldName));
     }
 
     String renamed(String fieldName) {
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
index 9b43186..c8aac95 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
@@ -40,7 +40,7 @@ public class ReplaceFieldTest {
     @Test
     public void tombstoneSchemaless() {
         final Map<String, String> props = new HashMap<>();
-        props.put("whitelist", "abc,foo");
+        props.put("include", "abc,foo");
         props.put("renames", "abc:xyz,foo:bar");
 
         xform.configure(props);
@@ -55,7 +55,7 @@ public class ReplaceFieldTest {
     @Test
     public void tombstoneWithSchema() {
         final Map<String, String> props = new HashMap<>();
-        props.put("whitelist", "abc,foo");
+        props.put("include", "abc,foo");
         props.put("renames", "abc:xyz,foo:bar");
 
         xform.configure(props);
@@ -77,7 +77,7 @@ public class ReplaceFieldTest {
     @Test
     public void schemaless() {
         final Map<String, String> props = new HashMap<>();
-        props.put("blacklist", "dont");
+        props.put("exclude", "dont");
         props.put("renames", "abc:xyz,foo:bar");
 
         xform.configure(props);
@@ -101,7 +101,7 @@ public class ReplaceFieldTest {
     @Test
     public void withSchema() {
         final Map<String, String> props = new HashMap<>();
-        props.put("whitelist", "abc,foo");
+        props.put("include", "abc,foo");
         props.put("renames", "abc:xyz,foo:bar");
 
         xform.configure(props);
@@ -129,4 +129,43 @@ public class ReplaceFieldTest {
         assertEquals(true, updatedValue.getBoolean("bar"));
     }
 
+    @Test
+    public void testIncludeBackwardsCompatibility() {
+        final Map<String, String> props = new HashMap<>();
+        props.put("whitelist", "abc,foo");
+        props.put("renames", "abc:xyz,foo:bar");
+
+        xform.configure(props);
+
+        final SinkRecord record = new SinkRecord("test", 0, null, null, null, null, 0);
+        final SinkRecord transformedRecord = xform.apply(record);
+
+        assertNull(transformedRecord.value());
+        assertNull(transformedRecord.valueSchema());
+    }
+
+
+    @Test
+    public void testExcludeBackwardsCompatibility() {
+        final Map<String, String> props = new HashMap<>();
+        props.put("blacklist", "dont");
+        props.put("renames", "abc:xyz,foo:bar");
+
+        xform.configure(props);
+
+        final Map<String, Object> value = new HashMap<>();
+        value.put("dont", "whatever");
+        value.put("abc", 42);
+        value.put("foo", true);
+        value.put("etc", "etc");
+
+        final SinkRecord record = new SinkRecord("test", 0, null, null, null, value, 0);
+        final SinkRecord transformedRecord = xform.apply(record);
+
+        final Map updatedValue = (Map) transformedRecord.value();
+        assertEquals(3, updatedValue.size());
+        assertEquals(42, updatedValue.get("xyz"));
+        assertEquals(true, updatedValue.get("bar"));
+        assertEquals("etc", updatedValue.get("etc"));
+    }
 }