You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/05/21 04:57:54 UTC

[kafka] branch 2.4 updated: KAFKA-9950: Construct new ConfigDef for MirrorTaskConfig before defining new properties (#8608)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 2983372  KAFKA-9950: Construct new ConfigDef for MirrorTaskConfig before defining new properties (#8608)
2983372 is described below

commit 298337281550b1cbb6c78202ea2c03f3a6549740
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Wed May 20 21:14:40 2020 -0700

    KAFKA-9950: Construct new ConfigDef for MirrorTaskConfig before defining new properties (#8608)
    
    `MirrorTaskConfig` class mutates the `ConfigDef` by defining additional properties, which leads to a potential `ConcurrentModificationException` during worker configuration validation and unintended inclusion of those new properties in the `ConfigDef` for the connectors which in turn is then visible via the REST API's `/connectors/{name}/config/validate` endpoint.
    
    The fix here is a one-liner that just creates a copy of the `ConfigDef` before defining new properties.
    
    Reviewers: Ryanne Dolan <ry...@gmail.com>, Konstantine Karantasis <ko...@confluent.io>
---
 .../kafka/connect/mirror/MirrorTaskConfig.java     |  2 +-
 .../connect/mirror/MirrorConnectorConfigTest.java  | 25 ++++++++++++++++++++++
 2 files changed, 26 insertions(+), 1 deletion(-)

diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorTaskConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorTaskConfig.java
index 839d41e..73024f5 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorTaskConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorTaskConfig.java
@@ -59,7 +59,7 @@ public class MirrorTaskConfig extends MirrorConnectorConfig {
         return metrics;
     }
  
-    protected static final ConfigDef TASK_CONFIG_DEF = CONNECTOR_CONFIG_DEF
+    protected static final ConfigDef TASK_CONFIG_DEF = new ConfigDef(CONNECTOR_CONFIG_DEF)
         .define(
             TASK_TOPIC_PARTITIONS,
             ConfigDef.Type.LIST,
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
index 6003202..8e99779 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
@@ -17,9 +17,11 @@
 package org.apache.kafka.connect.mirror;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigDef;
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
@@ -106,4 +108,27 @@ public class MirrorConnectorConfigTest {
         assertTrue(config.topicFilter().shouldReplicateTopic("topic2"));
         assertFalse(config.topicFilter().shouldReplicateTopic("topic3"));
     }
+
+    @Test
+    public void testNonMutationOfConfigDef() {
+        Collection<String> taskSpecificProperties = Arrays.asList(
+            MirrorConnectorConfig.TASK_TOPIC_PARTITIONS,
+            MirrorConnectorConfig.TASK_CONSUMER_GROUPS
+        );
+
+        // Sanity check to make sure that these properties are actually defined for the task config,
+        // and that the task config class has been loaded and statically initialized by the JVM
+        ConfigDef taskConfigDef = MirrorTaskConfig.TASK_CONFIG_DEF;
+        taskSpecificProperties.forEach(taskSpecificProperty -> assertTrue(
+            taskSpecificProperty + " should be defined for task ConfigDef",
+            taskConfigDef.names().contains(taskSpecificProperty)
+        ));
+
+        // Ensure that the task config class hasn't accidentally modified the connector config
+        ConfigDef connectorConfigDef = MirrorConnectorConfig.CONNECTOR_CONFIG_DEF;
+        taskSpecificProperties.forEach(taskSpecificProperty -> assertFalse(
+            taskSpecificProperty + " should not be defined for connector ConfigDef",
+            connectorConfigDef.names().contains(taskSpecificProperty)
+        ));
+    }
 }