You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/06/04 21:51:04 UTC

[kafka] branch 2.6 updated: KAFKA-10069: Correctly remove user-defined "predicate" and "negate" configs from transformation properties (#8755)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 87f54f0  KAFKA-10069: Correctly remove user-defined "predicate" and "negate" configs from transformation properties (#8755)
87f54f0 is described below

commit 87f54f060c67738a6318246864d3409f8790add5
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Thu Jun 4 13:13:50 2020 +0800

    KAFKA-10069: Correctly remove user-defined "predicate" and "negate" configs from transformation properties (#8755)
    
    With the recent introduction of predicated SMTs, properties named "predicate" and "negate" should be ignored and removed in case they are present in transformation configs.
    This commit fixes the equality check to be with the key of the config to apply proper removal.
    
    Reviewers: Tom Bentley <tb...@redhat.com>, Konstantine Karantasis <ko...@confluent.io>
---
 .../kafka/connect/runtime/ConnectorConfig.java     |  6 +--
 .../kafka/connect/runtime/ConnectorConfigTest.java | 49 ++++++++++++++++++++++
 2 files changed, 52 insertions(+), 3 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 37a710c..69f417b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -331,10 +331,10 @@ public class ConnectorConfig extends AbstractConfig {
                 return super.configDefsForClass(typeConfig)
                     .filter(entry -> {
                         // The implicit parameters mask any from the transformer with the same name
-                        if (PredicatedTransformation.PREDICATE_CONFIG.equals(entry.getValue())
-                                || PredicatedTransformation.NEGATE_CONFIG.equals(entry.getValue())) {
+                        if (PredicatedTransformation.PREDICATE_CONFIG.equals(entry.getKey())
+                                || PredicatedTransformation.NEGATE_CONFIG.equals(entry.getKey())) {
                             log.warn("Transformer config {} is masked by implicit config of that name",
-                                    entry.getValue());
+                                    entry.getKey());
                             return false;
                         } else {
                             return true;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
index c35663b..c400e48 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
@@ -33,6 +33,8 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -434,5 +436,52 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
         }
     }
 
+    @Test
+    public void testEnrichedConfigDef() {
+        String alias = "hdt";
+        String prefix = ConnectorConfig.TRANSFORMS_CONFIG + "." + alias + ".";
+        Map<String, String> props = new HashMap<>();
+        props.put(ConnectorConfig.TRANSFORMS_CONFIG, alias);
+        props.put(prefix + "type", HasDuplicateConfigTransformation.class.getName());
+        ConfigDef def = ConnectorConfig.enrich(MOCK_PLUGINS, new ConfigDef(), props, false);
+        assertEnrichedConfigDef(def, prefix, HasDuplicateConfigTransformation.MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN);
+        assertEnrichedConfigDef(def, prefix, PredicatedTransformation.PREDICATE_CONFIG, ConfigDef.Type.STRING);
+        assertEnrichedConfigDef(def, prefix, PredicatedTransformation.NEGATE_CONFIG, ConfigDef.Type.BOOLEAN);
+    }
+
+    private static void assertEnrichedConfigDef(ConfigDef def, String prefix, String keyName, ConfigDef.Type expectedType) {
+        assertNull(def.configKeys().get(keyName));
+        ConfigDef.ConfigKey configKey = def.configKeys().get(prefix + keyName);
+        assertNotNull(prefix + keyName + "' config must be present", configKey);
+        assertEquals(prefix + keyName + "' config should be a " + expectedType, expectedType, configKey.type);
+    }
+
+    public static class HasDuplicateConfigTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
+        private static final String MUST_EXIST_KEY = "must.exist.key";
+        private static final ConfigDef CONFIG_DEF = new ConfigDef()
+                // this configDef is duplicate. It should be removed automatically so as to avoid duplicate config error.
+                .define(PredicatedTransformation.PREDICATE_CONFIG, ConfigDef.Type.INT, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "fake")
+                // this configDef is duplicate. It should be removed automatically so as to avoid duplicate config error.
+                .define(PredicatedTransformation.NEGATE_CONFIG, ConfigDef.Type.INT, 123, ConfigDef.Importance.MEDIUM, "fake")
+                // this configDef should appear if above duplicate configDef is removed without any error
+                .define(MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, "this key must exist");
+
+        @Override
+        public R apply(R record) {
+            return record;
+        }
+
+        @Override
+        public ConfigDef config() {
+            return CONFIG_DEF;
+        }
 
+        @Override
+        public void close() {
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+        }
+    }
 }