You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/01 23:46:16 UTC

[GitHub] [kafka] kkonstantine commented on a change in pull request #8755: KAFKA-10069 The user-defined "predicate" and "negate" are not removed…

kkonstantine commented on a change in pull request #8755:
URL: https://github.com/apache/kafka/pull/8755#discussion_r433538453



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
##########
@@ -434,5 +436,59 @@ public void configure(Map<String, ?> configs) {
         }
     }
 
+    @Test
+    public void testConfigDefOverrideByInitialConfigDef() {
+        String alias = "hdt";
+        String prefix = ConnectorConfig.TRANSFORMS_CONFIG + "." + alias + ".";
+        Map<String, String> props = new HashMap<>();
+        props.put(ConnectorConfig.TRANSFORMS_CONFIG, alias);
+        props.put(prefix + "type", HasDuplicateConfigTransformation.class.getName());
+        ConfigDef def = ConnectorConfig.enrich(MOCK_PLUGINS,
+                new ConfigDef(),
+                props,
+                false);

Review comment:
       nit: I'm sure these fit in a line shorter than the one below

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
##########
@@ -434,5 +436,59 @@ public void configure(Map<String, ?> configs) {
         }
     }
 
+    @Test
+    public void testConfigDefOverrideByInitialConfigDef() {
+        String alias = "hdt";
+        String prefix = ConnectorConfig.TRANSFORMS_CONFIG + "." + alias + ".";
+        Map<String, String> props = new HashMap<>();
+        props.put(ConnectorConfig.TRANSFORMS_CONFIG, alias);
+        props.put(prefix + "type", HasDuplicateConfigTransformation.class.getName());
+        ConfigDef def = ConnectorConfig.enrich(MOCK_PLUGINS,
+                new ConfigDef(),
+                props,
+                false);
+        assertConfigDef(def, prefix, HasDuplicateConfigTransformation.MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN);
+        assertConfigDef(def, prefix, PredicatedTransformation.PREDICATE_CONFIG, ConfigDef.Type.STRING);
+        assertConfigDef(def, prefix, PredicatedTransformation.NEGATE_CONFIG, ConfigDef.Type.BOOLEAN);
+    }
+
+    private static void assertConfigDef(ConfigDef def, String prefix, String keyName, ConfigDef.Type expectedType) {
+        assertNull(def.configKeys().get(keyName));
+        ConfigDef.ConfigKey configKey = def.configKeys().get(prefix + keyName);
+        assertNotNull(prefix + keyName + "' config must be present", configKey);
+        assertEquals(prefix + keyName + "' config should be a " + expectedType, expectedType, configKey.type);
+    }
+
+    public static class HasDuplicateConfigTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
+        private static final String MUST_EXIST_KEY = "must.exist.key";
+        private static final ConfigDef CONFIG_DEF = new ConfigDef()
+                // this configDef is duplicate. It should be removed automatically so as to avoid duplicate config error.
+                .define(PredicatedTransformation.PREDICATE_CONFIG, ConfigDef.Type.INT, 100, ConfigDef.Importance.MEDIUM,

Review comment:
       let's add `ConfigDef.NO_DEFAULT_VALUE` in one of them 

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
##########
@@ -434,5 +436,59 @@ public void configure(Map<String, ?> configs) {
         }
     }
 
+    @Test
+    public void testConfigDefOverrideByInitialConfigDef() {
+        String alias = "hdt";
+        String prefix = ConnectorConfig.TRANSFORMS_CONFIG + "." + alias + ".";
+        Map<String, String> props = new HashMap<>();
+        props.put(ConnectorConfig.TRANSFORMS_CONFIG, alias);
+        props.put(prefix + "type", HasDuplicateConfigTransformation.class.getName());
+        ConfigDef def = ConnectorConfig.enrich(MOCK_PLUGINS,
+                new ConfigDef(),
+                props,
+                false);
+        assertConfigDef(def, prefix, HasDuplicateConfigTransformation.MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN);
+        assertConfigDef(def, prefix, PredicatedTransformation.PREDICATE_CONFIG, ConfigDef.Type.STRING);
+        assertConfigDef(def, prefix, PredicatedTransformation.NEGATE_CONFIG, ConfigDef.Type.BOOLEAN);
+    }
+
+    private static void assertConfigDef(ConfigDef def, String prefix, String keyName, ConfigDef.Type expectedType) {
+        assertNull(def.configKeys().get(keyName));
+        ConfigDef.ConfigKey configKey = def.configKeys().get(prefix + keyName);
+        assertNotNull(prefix + keyName + "' config must be present", configKey);
+        assertEquals(prefix + keyName + "' config should be a " + expectedType, expectedType, configKey.type);
+    }
+
+    public static class HasDuplicateConfigTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
+        private static final String MUST_EXIST_KEY = "must.exist.key";
+        private static final ConfigDef CONFIG_DEF = new ConfigDef()
+                // this configDef is duplicate. It should be removed automatically so as to avoid duplicate config error.
+                .define(PredicatedTransformation.PREDICATE_CONFIG, ConfigDef.Type.INT, 100, ConfigDef.Importance.MEDIUM,
+                        "fake")
+                // this configDef is duplicate. It should be removed automatically so as to avoid duplicate config error.
+                .define(PredicatedTransformation.NEGATE_CONFIG, ConfigDef.Type.INT, 123, ConfigDef.Importance.MEDIUM,
+                        "fake")
+                // this configDef should appear if above duplicate configDef is removed without any error
+                .define(MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM,
+                        "this key must exist");

Review comment:
       nit: fits in one line

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
##########
@@ -434,5 +436,59 @@ public void configure(Map<String, ?> configs) {
         }
     }
 
+    @Test
+    public void testConfigDefOverrideByInitialConfigDef() {
+        String alias = "hdt";
+        String prefix = ConnectorConfig.TRANSFORMS_CONFIG + "." + alias + ".";
+        Map<String, String> props = new HashMap<>();
+        props.put(ConnectorConfig.TRANSFORMS_CONFIG, alias);
+        props.put(prefix + "type", HasDuplicateConfigTransformation.class.getName());
+        ConfigDef def = ConnectorConfig.enrich(MOCK_PLUGINS,
+                new ConfigDef(),
+                props,
+                false);
+        assertConfigDef(def, prefix, HasDuplicateConfigTransformation.MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN);
+        assertConfigDef(def, prefix, PredicatedTransformation.PREDICATE_CONFIG, ConfigDef.Type.STRING);
+        assertConfigDef(def, prefix, PredicatedTransformation.NEGATE_CONFIG, ConfigDef.Type.BOOLEAN);
+    }
+
+    private static void assertConfigDef(ConfigDef def, String prefix, String keyName, ConfigDef.Type expectedType) {
+        assertNull(def.configKeys().get(keyName));
+        ConfigDef.ConfigKey configKey = def.configKeys().get(prefix + keyName);
+        assertNotNull(prefix + keyName + "' config must be present", configKey);
+        assertEquals(prefix + keyName + "' config should be a " + expectedType, expectedType, configKey.type);
+    }
+
+    public static class HasDuplicateConfigTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
+        private static final String MUST_EXIST_KEY = "must.exist.key";
+        private static final ConfigDef CONFIG_DEF = new ConfigDef()
+                // this configDef is duplicate. It should be removed automatically so as to avoid duplicate config error.
+                .define(PredicatedTransformation.PREDICATE_CONFIG, ConfigDef.Type.INT, 100, ConfigDef.Importance.MEDIUM,
+                        "fake")
+                // this configDef is duplicate. It should be removed automatically so as to avoid duplicate config error.
+                .define(PredicatedTransformation.NEGATE_CONFIG, ConfigDef.Type.INT, 123, ConfigDef.Importance.MEDIUM,
+                        "fake")

Review comment:
       nit: fits in one line

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
##########
@@ -434,5 +436,59 @@ public void configure(Map<String, ?> configs) {
         }
     }
 
+    @Test
+    public void testConfigDefOverrideByInitialConfigDef() {
+        String alias = "hdt";
+        String prefix = ConnectorConfig.TRANSFORMS_CONFIG + "." + alias + ".";
+        Map<String, String> props = new HashMap<>();
+        props.put(ConnectorConfig.TRANSFORMS_CONFIG, alias);
+        props.put(prefix + "type", HasDuplicateConfigTransformation.class.getName());
+        ConfigDef def = ConnectorConfig.enrich(MOCK_PLUGINS,
+                new ConfigDef(),
+                props,
+                false);
+        assertConfigDef(def, prefix, HasDuplicateConfigTransformation.MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN);
+        assertConfigDef(def, prefix, PredicatedTransformation.PREDICATE_CONFIG, ConfigDef.Type.STRING);
+        assertConfigDef(def, prefix, PredicatedTransformation.NEGATE_CONFIG, ConfigDef.Type.BOOLEAN);
+    }
+
+    private static void assertConfigDef(ConfigDef def, String prefix, String keyName, ConfigDef.Type expectedType) {
+        assertNull(def.configKeys().get(keyName));
+        ConfigDef.ConfigKey configKey = def.configKeys().get(prefix + keyName);
+        assertNotNull(prefix + keyName + "' config must be present", configKey);
+        assertEquals(prefix + keyName + "' config should be a " + expectedType, expectedType, configKey.type);
+    }
+
+    public static class HasDuplicateConfigTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
+        private static final String MUST_EXIST_KEY = "must.exist.key";
+        private static final ConfigDef CONFIG_DEF = new ConfigDef()
+                // this configDef is duplicate. It should be removed automatically so as to avoid duplicate config error.
+                .define(PredicatedTransformation.PREDICATE_CONFIG, ConfigDef.Type.INT, 100, ConfigDef.Importance.MEDIUM,
+                        "fake")
+                // this configDef is duplicate. It should be removed automatically so as to avoid duplicate config error.
+                .define(PredicatedTransformation.NEGATE_CONFIG, ConfigDef.Type.INT, 123, ConfigDef.Importance.MEDIUM,
+                        "fake")
+                // this configDef should appear if above duplicate configDef is removed without any error
+                .define(MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM,
+                        "this key must exist");
+        @Override
+        public R apply(R record) {
+            return record;
+        }
+
+        @Override
+        public ConfigDef config() {
+            return CONFIG_DEF;
+        }
+
+        @Override
+        public void close() {
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+        }
+    }
+

Review comment:
       nit: extra blank line

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
##########
@@ -434,5 +436,59 @@ public void configure(Map<String, ?> configs) {
         }
     }
 
+    @Test
+    public void testConfigDefOverrideByInitialConfigDef() {
+        String alias = "hdt";
+        String prefix = ConnectorConfig.TRANSFORMS_CONFIG + "." + alias + ".";
+        Map<String, String> props = new HashMap<>();
+        props.put(ConnectorConfig.TRANSFORMS_CONFIG, alias);
+        props.put(prefix + "type", HasDuplicateConfigTransformation.class.getName());
+        ConfigDef def = ConnectorConfig.enrich(MOCK_PLUGINS,
+                new ConfigDef(),
+                props,
+                false);
+        assertConfigDef(def, prefix, HasDuplicateConfigTransformation.MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN);
+        assertConfigDef(def, prefix, PredicatedTransformation.PREDICATE_CONFIG, ConfigDef.Type.STRING);
+        assertConfigDef(def, prefix, PredicatedTransformation.NEGATE_CONFIG, ConfigDef.Type.BOOLEAN);
+    }
+
+    private static void assertConfigDef(ConfigDef def, String prefix, String keyName, ConfigDef.Type expectedType) {
+        assertNull(def.configKeys().get(keyName));
+        ConfigDef.ConfigKey configKey = def.configKeys().get(prefix + keyName);
+        assertNotNull(prefix + keyName + "' config must be present", configKey);
+        assertEquals(prefix + keyName + "' config should be a " + expectedType, expectedType, configKey.type);
+    }
+
+    public static class HasDuplicateConfigTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
+        private static final String MUST_EXIST_KEY = "must.exist.key";
+        private static final ConfigDef CONFIG_DEF = new ConfigDef()
+                // this configDef is duplicate. It should be removed automatically so as to avoid duplicate config error.
+                .define(PredicatedTransformation.PREDICATE_CONFIG, ConfigDef.Type.INT, 100, ConfigDef.Importance.MEDIUM,
+                        "fake")
+                // this configDef is duplicate. It should be removed automatically so as to avoid duplicate config error.
+                .define(PredicatedTransformation.NEGATE_CONFIG, ConfigDef.Type.INT, 123, ConfigDef.Importance.MEDIUM,
+                        "fake")
+                // this configDef should appear if above duplicate configDef is removed without any error
+                .define(MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM,
+                        "this key must exist");

Review comment:
       nit: blank line missing here

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
##########
@@ -434,5 +436,59 @@ public void configure(Map<String, ?> configs) {
         }
     }
 
+    @Test
+    public void testConfigDefOverrideByInitialConfigDef() {
+        String alias = "hdt";
+        String prefix = ConnectorConfig.TRANSFORMS_CONFIG + "." + alias + ".";
+        Map<String, String> props = new HashMap<>();
+        props.put(ConnectorConfig.TRANSFORMS_CONFIG, alias);
+        props.put(prefix + "type", HasDuplicateConfigTransformation.class.getName());
+        ConfigDef def = ConnectorConfig.enrich(MOCK_PLUGINS,
+                new ConfigDef(),
+                props,
+                false);
+        assertConfigDef(def, prefix, HasDuplicateConfigTransformation.MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN);
+        assertConfigDef(def, prefix, PredicatedTransformation.PREDICATE_CONFIG, ConfigDef.Type.STRING);
+        assertConfigDef(def, prefix, PredicatedTransformation.NEGATE_CONFIG, ConfigDef.Type.BOOLEAN);
+    }
+
+    private static void assertConfigDef(ConfigDef def, String prefix, String keyName, ConfigDef.Type expectedType) {

Review comment:
       this method won't work with empty `prefix` but that's not obvious just by reading its name. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org