You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2019/11/14 08:40:38 UTC

[kafka] branch 2.3 updated: KAFKA-9046: Use top-level worker configs for connector admin clients

This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 2568432  KAFKA-9046: Use top-level worker configs for connector admin clients
2568432 is described below

commit 2568432233374402260186727a65f7cfb4432c38
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Thu Nov 14 14:09:04 2019 +0530

    KAFKA-9046: Use top-level worker configs for connector admin clients
    
    [Jira](https://issues.apache.org/jira/browse/KAFKA-9046)
    
    The changes here are meant to find a healthy compromise between the pre- and post-KIP-458 functionality of Connect workers when configuring admin clients for use with DLQs. Before KIP-458, admin clients were configured using the top-level worker configs; after KIP-458, they are configured using worker configs with a prefix of `admin.` and then optionally overridden by connector configs with a prefix of `admin.override.`. The behavior proposed here is to use, in ascending order of prec [...]
    
    Author: Chris Egerton <ch...@confluent.io>
    
    Reviewers: Konstantine Karantasis <ko...@confluent.io>, Randall Hauch <rh...@gmail.com>, Nigel Liang <ni...@nigelliang.com>
    
    Closes #7525 from C0urante/kafka-9046
    
    (cherry picked from commit 38d243b022336ecaf5cb400ae015c485f56ff978)
    Signed-off-by: Manikumar Reddy <ma...@confluent.io>
---
 .../java/org/apache/kafka/connect/runtime/Worker.java  | 18 ++++++++++++++++--
 .../org/apache/kafka/connect/runtime/WorkerTest.java   |  7 ++++---
 2 files changed, 20 insertions(+), 5 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 0d5448e..3d4479c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -601,8 +602,21 @@ public class Worker {
                                             Class<? extends Connector> connectorClass,
                                             ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
         Map<String, Object> adminProps = new HashMap<>();
-        adminProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
-        // User-specified overrides
+        // Use the top-level worker configs to retain backwards compatibility with older releases which
+        // did not require a prefix for connector admin client configs in the worker configuration file
+        // Ignore configs that begin with "admin." since those will be added next (with the prefix stripped)
+        // and those that begin with "producer." and "consumer.", since we know they aren't intended for
+        // the admin client
+        Map<String, Object> nonPrefixedWorkerConfigs = config.originals().entrySet().stream()
+            .filter(e -> !e.getKey().startsWith("admin.")
+                && !e.getKey().startsWith("producer.")
+                && !e.getKey().startsWith("consumer."))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+        adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
+            Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
+        adminProps.putAll(nonPrefixedWorkerConfigs);
+
+        // Admin client-specific overrides in the worker config
         adminProps.putAll(config.originalsWithPrefix("admin."));
 
         // Connector-specified overrides
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 9cb83eb..40b4df2 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -968,12 +968,15 @@ public class WorkerTest extends ThreadedTest {
         Map<String, String> props = new HashMap<>(workerProps);
         props.put("admin.client.id", "testid");
         props.put("admin.metadata.max.age.ms", "5000");
+        props.put("producer.bootstrap.servers", "cbeauho.com");
+        props.put("consumer.bootstrap.servers", "localhost:4761");
         WorkerConfig configWithOverrides = new StandaloneConfig(props);
 
         Map<String, Object> connConfig = new HashMap<String, Object>();
         connConfig.put("metadata.max.age.ms", "10000");
 
-        Map<String, String> expectedConfigs = new HashMap<>();
+        Map<String, String> expectedConfigs = new HashMap<>(workerProps);
+
         expectedConfigs.put("bootstrap.servers", "localhost:9092");
         expectedConfigs.put("client.id", "testid");
         expectedConfigs.put("metadata.max.age.ms", "10000");
@@ -983,7 +986,6 @@ public class WorkerTest extends ThreadedTest {
         PowerMock.replayAll();
         assertEquals(expectedConfigs, Worker.adminConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
                                                              null, allConnectorClientConfigOverridePolicy));
-
     }
 
     @Test(expected = ConnectException.class)
@@ -1001,7 +1003,6 @@ public class WorkerTest extends ThreadedTest {
         PowerMock.replayAll();
         Worker.adminConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
                                                           null, noneConnectorClientConfigOverridePolicy);
-
     }