You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/02/14 21:36:51 UTC
[kafka] branch trunk updated: KAFKA-14653: Use raw properties instead of post-resolution properties for MirrorMaker connectors(#13163)
This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b9754747d6e KAFKA-14653: Use raw properties instead of post-resolution properties for MirrorMaker connectors(#13163)
b9754747d6e is described below
commit b9754747d6eaa029c4bb69b073d749ff8df15908
Author: Dániel Urbán <48...@users.noreply.github.com>
AuthorDate: Tue Feb 14 22:36:34 2023 +0100
KAFKA-14653: Use raw properties instead of post-resolution properties for MirrorMaker connectors(#13163)
Reviewers: Chris Egerton <ch...@aiven.io>
---
.../kafka/connect/mirror/MirrorMakerConfig.java | 23 ++++++++++++---------
.../connect/mirror/MirrorMakerConfigTest.java | 24 ++++++++++++++++++++++
2 files changed, 37 insertions(+), 10 deletions(-)
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
index 3acc6bf7624..731d0b04fd1 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
@@ -87,10 +87,14 @@ public class MirrorMakerConfig extends AbstractConfig {
"Whether to bring up an internal-only REST server that allows multi-node clusters to operate correctly.";
private final Plugins plugins;
-
- public MirrorMakerConfig(Map<?, ?> props) {
+
+ private final Map<String, String> rawProperties;
+
+ public MirrorMakerConfig(Map<String, String> props) {
super(config(), props, true);
plugins = new Plugins(originalsStrings());
+
+ rawProperties = new HashMap<>(props);
}
public Set<String> clusters() {
@@ -147,7 +151,6 @@ public class MirrorMakerConfig extends AbstractConfig {
// loads properties of the form cluster.x.y.z
Map<String, String> clusterProps(String cluster) {
Map<String, String> props = new HashMap<>();
- Map<String, String> strings = originalsStrings();
props.putAll(stringsWithPrefixStripped(cluster + "."));
@@ -161,7 +164,7 @@ public class MirrorMakerConfig extends AbstractConfig {
}
for (String k : MirrorClientConfig.CLIENT_CONFIG_DEF.names()) {
- String v = strings.get(k);
+ String v = rawProperties.get(k);
if (v != null) {
props.putIfAbsent("producer." + k, v);
props.putIfAbsent("consumer." + k, v);
@@ -177,7 +180,7 @@ public class MirrorMakerConfig extends AbstractConfig {
public Map<String, String> workerConfig(SourceAndTarget sourceAndTarget) {
Map<String, String> props = new HashMap<>();
props.putAll(clusterProps(sourceAndTarget.target()));
-
+
// Accept common top-level configs that are otherwise ignored by MM2.
// N.B. all other worker properties should be configured for specific herders,
// e.g. primary->backup.client.id
@@ -190,7 +193,7 @@ public class MirrorMakerConfig extends AbstractConfig {
props.putAll(stringsWithPrefix("task"));
props.putAll(stringsWithPrefix("worker"));
props.putAll(stringsWithPrefix("replication.policy"));
-
+
// transform any expression like ${provider:path:key}, since the worker doesn't do so
props = transform(props);
props.putAll(stringsWithPrefix(CONFIG_PROVIDERS_CONFIG));
@@ -227,7 +230,7 @@ public class MirrorMakerConfig extends AbstractConfig {
public Map<String, String> connectorBaseConfig(SourceAndTarget sourceAndTarget, Class<?> connectorClass) {
Map<String, String> props = new HashMap<>();
- props.putAll(originalsStrings());
+ props.putAll(rawProperties);
props.keySet().retainAll(allConfigNames());
props.putAll(stringsWithPrefix(CONFIG_PROVIDERS_CONFIG));
@@ -298,13 +301,13 @@ public class MirrorMakerConfig extends AbstractConfig {
RestServerConfig.addInternalConfig(result);
return result;
}
-
+
private Map<String, String> stringsWithPrefixStripped(String prefix) {
- return Utils.entriesWithPrefix(originalsStrings(), prefix);
+ return Utils.entriesWithPrefix(rawProperties, prefix);
}
private Map<String, String> stringsWithPrefix(String prefix) {
- return Utils.entriesWithPrefix(originalsStrings(), prefix, false);
+ return Utils.entriesWithPrefix(rawProperties, prefix, false);
}
static Map<String, String> clusterConfigsWithPrefix(String prefix, Map<String, String> props) {
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
index e2d055920dd..9c017263a8c 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
@@ -372,6 +372,30 @@ public class MirrorMakerConfigTest {
assertTrue(allNames.contains("emit.heartbeats.enabled"));
}
+ @Test
+ public void testLazyConfigResolution() {
+ MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
+ "clusters", "a, b",
+ "config.providers", "fake",
+ "config.providers.fake.class", FakeConfigProvider.class.getName(),
+ "replication.policy.separator", "__",
+ "offset.storage.replication.factor", "123",
+ "b.status.storage.replication.factor", "456",
+ "b.producer.client.id", "client-one",
+ "b.security.protocol", "PLAINTEXT",
+ "b.producer.security.protocol", "SASL",
+ "ssl.truststore.password", "secret1",
+ "ssl.key.password", "${fake:secret:password}", // should not be resolved
+ "b.xxx", "yyy",
+ "b->a.topics", "${fake:secret:password}")); // should not be resolved
+ SourceAndTarget a = new SourceAndTarget("b", "a");
+ Map<String, String> props = mirrorConfig.connectorBaseConfig(a, MirrorSourceConnector.class);
+ assertEquals("${fake:secret:password}", props.get("ssl.key.password"),
+ "connector properties should not be transformed");
+ assertEquals("${fake:secret:password}", props.get("topics"),
+ "connector properties should not be transformed");
+ }
+
public static class FakeConfigProvider implements ConfigProvider {
Map<String, String> secrets = Collections.singletonMap("password", "secret2");