You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/02/14 21:36:51 UTC

[kafka] branch trunk updated: KAFKA-14653: Use raw properties instead of post-resolution properties for MirrorMaker connectors(#13163)

This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b9754747d6e KAFKA-14653: Use raw properties instead of post-resolution properties for MirrorMaker connectors(#13163)
b9754747d6e is described below

commit b9754747d6eaa029c4bb69b073d749ff8df15908
Author: Dániel Urbán <48...@users.noreply.github.com>
AuthorDate: Tue Feb 14 22:36:34 2023 +0100

    KAFKA-14653: Use raw properties instead of post-resolution properties for MirrorMaker connectors(#13163)
    
    Reviewers: Chris Egerton <ch...@aiven.io>
---
 .../kafka/connect/mirror/MirrorMakerConfig.java    | 23 ++++++++++++---------
 .../connect/mirror/MirrorMakerConfigTest.java      | 24 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 10 deletions(-)

diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
index 3acc6bf7624..731d0b04fd1 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
@@ -87,10 +87,14 @@ public class MirrorMakerConfig extends AbstractConfig {
             "Whether to bring up an internal-only REST server that allows multi-node clusters to operate correctly.";
 
     private final Plugins plugins;
-   
-    public MirrorMakerConfig(Map<?, ?> props) {
+
+    private final Map<String, String> rawProperties;
+
+    public MirrorMakerConfig(Map<String, String> props) {
         super(config(), props, true);
         plugins = new Plugins(originalsStrings());
+
+        rawProperties = new HashMap<>(props);
     }
 
     public Set<String> clusters() {
@@ -147,7 +151,6 @@ public class MirrorMakerConfig extends AbstractConfig {
     // loads properties of the form cluster.x.y.z
     Map<String, String> clusterProps(String cluster) {
         Map<String, String> props = new HashMap<>();
-        Map<String, String> strings = originalsStrings();
 
         props.putAll(stringsWithPrefixStripped(cluster + "."));
 
@@ -161,7 +164,7 @@ public class MirrorMakerConfig extends AbstractConfig {
         }
 
         for (String k : MirrorClientConfig.CLIENT_CONFIG_DEF.names()) {
-            String v = strings.get(k);
+            String v = rawProperties.get(k);
             if (v != null) {
                 props.putIfAbsent("producer." + k, v);
                 props.putIfAbsent("consumer." + k, v);
@@ -177,7 +180,7 @@ public class MirrorMakerConfig extends AbstractConfig {
     public Map<String, String> workerConfig(SourceAndTarget sourceAndTarget) {
         Map<String, String> props = new HashMap<>();
         props.putAll(clusterProps(sourceAndTarget.target()));
-      
+
         // Accept common top-level configs that are otherwise ignored by MM2.
         // N.B. all other worker properties should be configured for specific herders,
         // e.g. primary->backup.client.id
@@ -190,7 +193,7 @@ public class MirrorMakerConfig extends AbstractConfig {
         props.putAll(stringsWithPrefix("task"));
         props.putAll(stringsWithPrefix("worker"));
         props.putAll(stringsWithPrefix("replication.policy"));
- 
+
         // transform any expression like ${provider:path:key}, since the worker doesn't do so
         props = transform(props);
         props.putAll(stringsWithPrefix(CONFIG_PROVIDERS_CONFIG));
@@ -227,7 +230,7 @@ public class MirrorMakerConfig extends AbstractConfig {
     public Map<String, String> connectorBaseConfig(SourceAndTarget sourceAndTarget, Class<?> connectorClass) {
         Map<String, String> props = new HashMap<>();
 
-        props.putAll(originalsStrings());
+        props.putAll(rawProperties);
         props.keySet().retainAll(allConfigNames());
         
         props.putAll(stringsWithPrefix(CONFIG_PROVIDERS_CONFIG));
@@ -298,13 +301,13 @@ public class MirrorMakerConfig extends AbstractConfig {
         RestServerConfig.addInternalConfig(result);
         return result;
     }
- 
+
     private Map<String, String> stringsWithPrefixStripped(String prefix) {
-        return Utils.entriesWithPrefix(originalsStrings(), prefix);
+        return Utils.entriesWithPrefix(rawProperties, prefix);
     }
 
     private Map<String, String> stringsWithPrefix(String prefix) {
-        return Utils.entriesWithPrefix(originalsStrings(), prefix, false);
+        return Utils.entriesWithPrefix(rawProperties, prefix, false);
     }
 
     static Map<String, String> clusterConfigsWithPrefix(String prefix, Map<String, String> props) {
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
index e2d055920dd..9c017263a8c 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
@@ -372,6 +372,30 @@ public class MirrorMakerConfigTest {
         assertTrue(allNames.contains("emit.heartbeats.enabled"));
     }
 
+    @Test
+    public void testLazyConfigResolution() {
+        MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
+            "clusters", "a, b",
+            "config.providers", "fake",
+            "config.providers.fake.class", FakeConfigProvider.class.getName(),
+            "replication.policy.separator", "__",
+            "offset.storage.replication.factor", "123",
+            "b.status.storage.replication.factor", "456",
+            "b.producer.client.id", "client-one",
+            "b.security.protocol", "PLAINTEXT",
+            "b.producer.security.protocol", "SASL",
+            "ssl.truststore.password", "secret1",
+            "ssl.key.password", "${fake:secret:password}",  // should not be resolved
+            "b.xxx", "yyy",
+            "b->a.topics", "${fake:secret:password}")); // should not be resolved
+        SourceAndTarget a = new SourceAndTarget("b", "a");
+        Map<String, String> props = mirrorConfig.connectorBaseConfig(a, MirrorSourceConnector.class);
+        assertEquals("${fake:secret:password}", props.get("ssl.key.password"),
+            "connector properties should not be transformed");
+        assertEquals("${fake:secret:password}", props.get("topics"),
+            "connector properties should not be transformed");
+    }
+
     public static class FakeConfigProvider implements ConfigProvider {
 
         Map<String, String> secrets = Collections.singletonMap("password", "secret2");