You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2019/11/14 08:40:38 UTC
[kafka] branch 2.3 updated: KAFKA-9046: Use top-level worker
configs for connector admin clients
This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new 2568432 KAFKA-9046: Use top-level worker configs for connector admin clients
2568432 is described below
commit 2568432233374402260186727a65f7cfb4432c38
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Thu Nov 14 14:09:04 2019 +0530
KAFKA-9046: Use top-level worker configs for connector admin clients
[Jira](https://issues.apache.org/jira/browse/KAFKA-9046)
The changes here are meant to find a healthy compromise between the pre- and post-KIP-458 functionality of Connect workers when configuring admin clients for use with DLQs. Before KIP-458, admin clients were configured using the top-level worker configs; after KIP-458, they are configured using worker configs with a prefix of `admin.` and then optionally overridden by connector configs with a prefix of `admin.override.`. The behavior proposed here is to use, in ascending order of prec [...]
Author: Chris Egerton <ch...@confluent.io>
Reviewers: Konstantine Karantasis <ko...@confluent.io>, Randall Hauch <rh...@gmail.com>, Nigel Liang <ni...@nigelliang.com>
Closes #7525 from C0urante/kafka-9046
(cherry picked from commit 38d243b022336ecaf5cb400ae015c485f56ff978)
Signed-off-by: Manikumar Reddy <ma...@confluent.io>
---
.../java/org/apache/kafka/connect/runtime/Worker.java | 18 ++++++++++++++++--
.../org/apache/kafka/connect/runtime/WorkerTest.java | 7 ++++---
2 files changed, 20 insertions(+), 5 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 0d5448e..3d4479c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.runtime;
+import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -601,8 +602,21 @@ public class Worker {
Class<? extends Connector> connectorClass,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
Map<String, Object> adminProps = new HashMap<>();
- adminProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
- // User-specified overrides
+ // Use the top-level worker configs to retain backwards compatibility with older releases which
+ // did not require a prefix for connector admin client configs in the worker configuration file
+ // Ignore configs that begin with "admin." since those will be added next (with the prefix stripped)
+ // and those that begin with "producer." and "consumer.", since we know they aren't intended for
+ // the admin client
+ Map<String, Object> nonPrefixedWorkerConfigs = config.originals().entrySet().stream()
+ .filter(e -> !e.getKey().startsWith("admin.")
+ && !e.getKey().startsWith("producer.")
+ && !e.getKey().startsWith("consumer."))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
+ Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
+ adminProps.putAll(nonPrefixedWorkerConfigs);
+
+ // Admin client-specific overrides in the worker config
adminProps.putAll(config.originalsWithPrefix("admin."));
// Connector-specified overrides
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 9cb83eb..40b4df2 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -968,12 +968,15 @@ public class WorkerTest extends ThreadedTest {
Map<String, String> props = new HashMap<>(workerProps);
props.put("admin.client.id", "testid");
props.put("admin.metadata.max.age.ms", "5000");
+ props.put("producer.bootstrap.servers", "cbeauho.com");
+ props.put("consumer.bootstrap.servers", "localhost:4761");
WorkerConfig configWithOverrides = new StandaloneConfig(props);
Map<String, Object> connConfig = new HashMap<String, Object>();
connConfig.put("metadata.max.age.ms", "10000");
- Map<String, String> expectedConfigs = new HashMap<>();
+ Map<String, String> expectedConfigs = new HashMap<>(workerProps);
+
expectedConfigs.put("bootstrap.servers", "localhost:9092");
expectedConfigs.put("client.id", "testid");
expectedConfigs.put("metadata.max.age.ms", "10000");
@@ -983,7 +986,6 @@ public class WorkerTest extends ThreadedTest {
PowerMock.replayAll();
assertEquals(expectedConfigs, Worker.adminConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
null, allConnectorClientConfigOverridePolicy));
-
}
@Test(expected = ConnectException.class)
@@ -1001,7 +1003,6 @@ public class WorkerTest extends ThreadedTest {
PowerMock.replayAll();
Worker.adminConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
null, noneConnectorClientConfigOverridePolicy);
-
}