You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/05/21 04:57:54 UTC
[kafka] branch 2.4 updated: KAFKA-9950: Construct new ConfigDef for
MirrorTaskConfig before defining new properties (#8608)
This is an automated email from the ASF dual-hosted git repository.
kkarantasis pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 2983372 KAFKA-9950: Construct new ConfigDef for MirrorTaskConfig before defining new properties (#8608)
2983372 is described below
commit 298337281550b1cbb6c78202ea2c03f3a6549740
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Wed May 20 21:14:40 2020 -0700
KAFKA-9950: Construct new ConfigDef for MirrorTaskConfig before defining new properties (#8608)
`MirrorTaskConfig` class mutates the `ConfigDef` by defining additional properties, which leads to a potential `ConcurrentModificationException` during worker configuration validation and unintended inclusion of those new properties in the `ConfigDef` for the connectors which in turn is then visible via the REST API's `/connectors/{name}/config/validate` endpoint.
The fix here is a one-liner that just creates a copy of the `ConfigDef` before defining new properties.
Reviewers: Ryanne Dolan <ry...@gmail.com>, Konstantine Karantasis <ko...@confluent.io>
---
.../kafka/connect/mirror/MirrorTaskConfig.java | 2 +-
.../connect/mirror/MirrorConnectorConfigTest.java | 25 ++++++++++++++++++++++
2 files changed, 26 insertions(+), 1 deletion(-)
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorTaskConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorTaskConfig.java
index 839d41e..73024f5 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorTaskConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorTaskConfig.java
@@ -59,7 +59,7 @@ public class MirrorTaskConfig extends MirrorConnectorConfig {
return metrics;
}
- protected static final ConfigDef TASK_CONFIG_DEF = CONNECTOR_CONFIG_DEF
+ protected static final ConfigDef TASK_CONFIG_DEF = new ConfigDef(CONNECTOR_CONFIG_DEF)
.define(
TASK_TOPIC_PARTITIONS,
ConfigDef.Type.LIST,
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
index 6003202..8e99779 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
@@ -17,9 +17,11 @@
package org.apache.kafka.connect.mirror;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigDef;
import org.junit.Test;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
@@ -106,4 +108,27 @@ public class MirrorConnectorConfigTest {
assertTrue(config.topicFilter().shouldReplicateTopic("topic2"));
assertFalse(config.topicFilter().shouldReplicateTopic("topic3"));
}
+
+ @Test
+ public void testNonMutationOfConfigDef() {
+ Collection<String> taskSpecificProperties = Arrays.asList(
+ MirrorConnectorConfig.TASK_TOPIC_PARTITIONS,
+ MirrorConnectorConfig.TASK_CONSUMER_GROUPS
+ );
+
+ // Sanity check to make sure that these properties are actually defined for the task config,
+ // and that the task config class has been loaded and statically initialized by the JVM
+ ConfigDef taskConfigDef = MirrorTaskConfig.TASK_CONFIG_DEF;
+ taskSpecificProperties.forEach(taskSpecificProperty -> assertTrue(
+ taskSpecificProperty + " should be defined for task ConfigDef",
+ taskConfigDef.names().contains(taskSpecificProperty)
+ ));
+
+ // Ensure that the task config class hasn't accidentally modified the connector config
+ ConfigDef connectorConfigDef = MirrorConnectorConfig.CONNECTOR_CONFIG_DEF;
+ taskSpecificProperties.forEach(taskSpecificProperty -> assertFalse(
+ taskSpecificProperty + " should not be defined for connector ConfigDef",
+ connectorConfigDef.names().contains(taskSpecificProperty)
+ ));
+ }
}