You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/05/17 08:37:55 UTC

[kafka] branch trunk updated: KAFKA-8265: Initial implementation for ConnectorClientConfigPolicy to enable overrides (KIP-458) (#6624)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2e91a31  KAFKA-8265: Initial implementation for ConnectorClientConfigPolicy to enable overrides (KIP-458) (#6624)
2e91a31 is described below

commit 2e91a310d7bf9e7d4d46b0bc0ca0c11cb4531e10
Author: Magesh Nandakumar <ma...@gmail.com>
AuthorDate: Fri May 17 01:37:32 2019 -0700

    KAFKA-8265: Initial implementation for ConnectorClientConfigPolicy to enable overrides (KIP-458) (#6624)
    
    Implementation to enable policy for Connector Client config overrides. This is
    implemented per the KIP-458.
    
    Reviewers: Randall Hauch <rh...@gmail.com>
---
 checkstyle/import-control.xml                      |   8 +
 checkstyle/suppressions.xml                        |   3 +-
 .../kafka/clients/admin/AdminClientConfig.java     |   4 +
 .../kafka/clients/consumer/ConsumerConfig.java     |   4 +
 .../kafka/clients/producer/ProducerConfig.java     |   4 +
 .../ConnectorClientConfigOverridePolicy.java       |  47 ++++++
 .../policy/ConnectorClientConfigRequest.java       | 101 +++++++++++++
 .../kafka/connect/cli/ConnectDistributed.java      |  10 +-
 .../kafka/connect/cli/ConnectStandalone.java       |  10 +-
 ...bstractConnectorClientConfigOverridePolicy.java |  57 +++++++
 .../AllConnectorClientConfigOverridePolicy.java    |  46 ++++++
 .../NoneConnectorClientConfigOverridePolicy.java   |  47 ++++++
 ...incipalConnectorClientConfigOverridePolicy.java |  57 +++++++
 .../kafka/connect/runtime/AbstractHerder.java      | 113 +++++++++++++-
 .../kafka/connect/runtime/ConnectorConfig.java     |   5 +
 .../org/apache/kafka/connect/runtime/Worker.java   | 117 ++++++++++++---
 .../apache/kafka/connect/runtime/WorkerConfig.java |  12 +-
 .../runtime/distributed/DistributedHerder.java     |  12 +-
 .../runtime/errors/DeadLetterQueueReporter.java    |   5 +-
 .../runtime/isolation/DelegatingClassLoader.java   |  13 +-
 .../runtime/isolation/PluginScanResult.java        |  28 +++-
 .../connect/runtime/isolation/PluginType.java      |   2 +
 .../connect/runtime/isolation/PluginUtils.java     |   1 +
 .../kafka/connect/runtime/isolation/Plugins.java   |   7 +-
 .../runtime/standalone/StandaloneHerder.java       |  12 +-
 ...ctor.policy.ConnectorClientConfigOverridePolicy |  18 +++
 ...aseConnectorClientConfigOverridePolicyTest.java |  59 ++++++++
 ...oneConnectorClientConfigOverridePolicyTest.java |  49 ++++++
 ...palConnectorClientConfigOverridePolicyTest.java |  50 ++++++
 .../ConnectorCientPolicyIntegrationTest.java       | 146 ++++++++++++++++++
 .../kafka/connect/runtime/AbstractHerderTest.java  |  82 ++++++++--
 .../apache/kafka/connect/runtime/WorkerTest.java   | 167 ++++++++++++++++++---
 .../runtime/distributed/DistributedHerderTest.java |   6 +-
 .../runtime/standalone/StandaloneHerderTest.java   |   8 +-
 .../util/clusters/EmbeddedConnectCluster.java      |  24 ++-
 35 files changed, 1246 insertions(+), 88 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 67cbe5a..a76fd1d 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -322,6 +322,13 @@
       <allow pkg="org.apache.kafka.connect.storage" />
     </subpackage>
 
+    <subpackage name="connector.policy">
+      <allow pkg="org.apache.kafka.connect.health" />
+      <allow pkg="org.apache.kafka.connect.connector" />
+      <!-- for testing -->
+      <allow pkg="org.apache.kafka.connect.runtime" />
+    </subpackage>
+
     <subpackage name="rest">
       <allow pkg="org.apache.kafka.connect.health" />
       <allow pkg="javax.ws.rs" />
@@ -356,6 +363,7 @@
       <allow pkg="org.apache.kafka.connect.storage" />
       <allow pkg="org.apache.kafka.connect.util" />
       <allow pkg="org.apache.kafka.common" />
+      <allow pkg="org.apache.kafka.connect.connector.policy" />
     </subpackage>
 
     <subpackage name="storage">
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 0c65edc..977a7ac 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -81,7 +81,8 @@
     <!-- Connect -->
     <suppress checks="ClassFanOutComplexity"
               files="DistributedHerder(|Test).java"/>
-
+    <suppress checks="ClassFanOutComplexity"
+              files="Worker.java"/>
     <suppress checks="MethodLength"
               files="(KafkaConfigBackingStore|RequestResponseTest|WorkerSinkTaskTest).java"/>
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index 47c76ac..1fa2335 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -200,6 +200,10 @@ public class AdminClientConfig extends AbstractConfig {
         return CONFIG.names();
     }
 
+    public static ConfigDef configDef() {
+        return  CONFIG;
+    }
+
     public static void main(String[] args) {
         System.out.println(CONFIG.toHtmlTable());
     }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index c9b5004..ba1928e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -531,6 +531,10 @@ public class ConsumerConfig extends AbstractConfig {
         return CONFIG.names();
     }
 
+    public static ConfigDef configDef() {
+        return  CONFIG;
+    }
+
     public static void main(String[] args) {
         System.out.println(CONFIG.toHtmlTable());
     }
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 758f858..396d91e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -404,6 +404,10 @@ public class ProducerConfig extends AbstractConfig {
         return CONFIG.names();
     }
 
+    public static ConfigDef configDef() {
+        return  CONFIG;
+    }
+
     public static void main(String[] args) {
         System.out.println(CONFIG.toHtmlTable());
     }
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/policy/ConnectorClientConfigOverridePolicy.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/policy/ConnectorClientConfigOverridePolicy.java
new file mode 100644
index 0000000..94e5fd6
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/policy/ConnectorClientConfigOverridePolicy.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.connector.policy;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.config.ConfigValue;
+
+import java.util.List;
+
+/**
+ * <p>An interface for enforcing a policy on overriding of client configs via the connector configs.
+ *
+ * <p>Common use cases are ability to provide principal per connector, <code>sasl.jaas.config</code>
+ * and/or enforcing that the producer/consumer configurations for optimizations are within acceptable ranges.
+ */
+public interface ConnectorClientConfigOverridePolicy extends Configurable, AutoCloseable {
+
+
+    /**
+     * Worker will invoke this while constructing the producer for the SourceConnectors,  DLQ for SinkConnectors and the consumer for the
+     * SinkConnectors to validate if all of the overridden client configurations are allowed per the
+     * policy implementation. This would also be invoked during the validate of connector configs via the Rest API.
+     *
+     * If there are any policy violations, the connector will not be started.
+     *
+     * @param connectorClientConfigRequest an instance of {@code ConnectorClientConfigRequest} that provides the configs to overridden and
+     *                                     its context; never {@code null}
+     * @return list of {@link ConfigValue} instances that describe each client configuration in the request and includes an 
+               {@link ConfigValue#errorMessages error} if the configuration is not allowed by the policy; never null
+     */
+    List<ConfigValue> validate(ConnectorClientConfigRequest connectorClientConfigRequest);
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/policy/ConnectorClientConfigRequest.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/policy/ConnectorClientConfigRequest.java
new file mode 100644
index 0000000..11f756b
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/policy/ConnectorClientConfigRequest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.connector.policy;
+
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.health.ConnectorType;
+
+import java.util.Map;
+
+public class ConnectorClientConfigRequest {
+
+    private Map<String, Object> clientProps;
+    private ClientType  clientType;
+    private String connectorName;
+    private ConnectorType connectorType;
+    private Class<? extends Connector> connectorClass;
+
+    public ConnectorClientConfigRequest(
+        String connectorName,
+        ConnectorType connectorType,
+        Class<? extends Connector> connectorClass,
+        Map<String, Object> clientProps,
+        ClientType clientType) {
+        this.clientProps = clientProps;
+        this.clientType = clientType;
+        this.connectorName = connectorName;
+        this.connectorType = connectorType;
+        this.connectorClass = connectorClass;
+    }
+
+    /**
+     * Provides Config with prefix {@code producer.override.} for {@link ConnectorType#SOURCE}.
+     * Provides Config with prefix {@code consumer.override.} for {@link ConnectorType#SINK}.
+     * Provides Config with prefix {@code producer.override.} for {@link ConnectorType#SINK} for DLQ.
+     * Provides Config with prefix {@code admin.override.} for {@link ConnectorType#SINK} for DLQ.
+     *
+     * @return The client properties specified in the Connector Config with prefix {@code producer.override.} ,
+     * {@code consumer.override.} and {@code admin.override.}. The configs don't include the prefixes.
+     */
+    public Map<String, Object> clientProps() {
+        return clientProps;
+    }
+
+    /**
+     * {@link ClientType#PRODUCER} for {@link ConnectorType#SOURCE}
+     * {@link ClientType#CONSUMER} for {@link ConnectorType#SINK}
+     * {@link ClientType#PRODUCER} for DLQ in {@link ConnectorType#SINK}
+     * {@link ClientType#ADMIN} for DLQ  Topic Creation in {@link ConnectorType#SINK}
+     *
+     * @return enumeration specifying the client type that is being overriden by the worker; never null.
+     */
+    public ClientType clientType() {
+        return clientType;
+    }
+
+    /**
+     * Name of the connector specified in the connector config.
+     *
+     * @return name of the connector; never null.
+     */
+    public String connectorName() {
+        return connectorName;
+    }
+
+    /**
+     * Type of the Connector.
+     *
+     * @return enumeration specifying the type of the connector {@link ConnectorType#SINK} or {@link ConnectorType#SOURCE}.
+     */
+    public ConnectorType connectorType() {
+        return connectorType;
+    }
+
+    /**
+     * The class of the Connector.
+     *
+     * @return the class of the Connector being created; never null
+     */
+    public Class<? extends Connector> connectorClass() {
+        return connectorClass;
+    }
+
+    public enum ClientType {
+        PRODUCER, CONSUMER, ADMIN;
+    }
+}
\ No newline at end of file
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 17d65ac..22c1ad8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -19,8 +19,10 @@ package org.apache.kafka.connect.cli;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.runtime.Connect;
 import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
 import org.apache.kafka.connect.runtime.WorkerInfo;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
@@ -102,7 +104,11 @@ public class ConnectDistributed {
         KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
         offsetBackingStore.configure(config);
 
-        Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore);
+        ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
+                config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
+                config, ConnectorClientConfigOverridePolicy.class);
+
+        Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy);
         WorkerConfigTransformer configTransformer = worker.configTransformer();
 
         Converter internalValueConverter = worker.getInternalValueConverter();
@@ -116,7 +122,7 @@ public class ConnectDistributed {
 
         DistributedHerder herder = new DistributedHerder(config, time, worker,
                 kafkaClusterId, statusBackingStore, configBackingStore,
-                advertisedUrl.toString());
+                advertisedUrl.toString(), connectorClientConfigOverridePolicy);
 
         final Connect connect = new Connect(herder, rest);
         log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index 499e6df..cf7b93b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -19,10 +19,12 @@ package org.apache.kafka.connect.cli;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.runtime.Connect;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.WorkerInfo;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.RestServer;
@@ -87,9 +89,13 @@ public class ConnectStandalone {
             URI advertisedUrl = rest.advertisedUrl();
             String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
 
-            Worker worker = new Worker(workerId, time, plugins, config, new FileOffsetBackingStore());
+            ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
+                config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
+                config, ConnectorClientConfigOverridePolicy.class);
+            Worker worker = new Worker(workerId, time, plugins, config, new FileOffsetBackingStore(),
+                                       connectorClientConfigOverridePolicy);
 
-            Herder herder = new StandaloneHerder(worker, kafkaClusterId);
+            Herder herder = new StandaloneHerder(worker, kafkaClusterId, connectorClientConfigOverridePolicy);
             final Connect connect = new Connect(herder, rest);
             log.info("Kafka Connect standalone worker initialization took {}ms", time.hiResClockMs() - initStart);
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AbstractConnectorClientConfigOverridePolicy.java b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AbstractConnectorClientConfigOverridePolicy.java
new file mode 100644
index 0000000..3c310db
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AbstractConnectorClientConfigOverridePolicy.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.connector.policy;
+
+import org.apache.kafka.common.config.ConfigValue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public abstract class AbstractConnectorClientConfigOverridePolicy implements ConnectorClientConfigOverridePolicy {
+
+    @Override
+    public void close() throws Exception {
+
+    }
+
+    @Override
+    public final List<ConfigValue> validate(ConnectorClientConfigRequest connectorClientConfigRequest) {
+        Map<String, Object> inputConfig = connectorClientConfigRequest.clientProps();
+        return inputConfig.entrySet().stream().map(this::configValue).collect(Collectors.toList());
+    }
+
+    protected ConfigValue configValue(Map.Entry<String, Object> configEntry) {
+        ConfigValue configValue =
+            new ConfigValue(configEntry.getKey(), configEntry.getValue(), new ArrayList<>(), new ArrayList<String>());
+        validate(configValue);
+        return configValue;
+    }
+
+    protected void validate(ConfigValue configValue) {
+        if (!isAllowed(configValue)) {
+            configValue.addErrorMessage("The '" + policyName() + "' policy does not allow '" + configValue.name()
+                                        + "' to be overridden in the connector configuration.");
+        }
+    }
+
+    protected abstract String policyName();
+
+    protected abstract boolean isAllowed(ConfigValue configValue);
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AllConnectorClientConfigOverridePolicy.java b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AllConnectorClientConfigOverridePolicy.java
new file mode 100644
index 0000000..ecd76ba
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AllConnectorClientConfigOverridePolicy.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.connector.policy;
+
+import org.apache.kafka.common.config.ConfigValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Allows all client configurations to be overridden via the connector configs by setting {@code client.config.policy} to {@code All}
+ */
+public class AllConnectorClientConfigOverridePolicy extends AbstractConnectorClientConfigOverridePolicy {
+    private static final Logger log = LoggerFactory.getLogger(AllConnectorClientConfigOverridePolicy.class);
+
+    @Override
+    protected String policyName() {
+        return "All";
+    }
+
+    @Override
+    protected boolean isAllowed(ConfigValue configValue) {
+        return true;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        log.info("Setting up All Policy for ConnectorClientConfigOverride. This will allow all client configurations to be overridden");
+    }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/NoneConnectorClientConfigOverridePolicy.java b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/NoneConnectorClientConfigOverridePolicy.java
new file mode 100644
index 0000000..8236c89
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/NoneConnectorClientConfigOverridePolicy.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.connector.policy;
+
+import org.apache.kafka.common.config.ConfigValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Disallow any client configuration to be overridden via the connector configs by setting {@code client.config.policy} to {@code None}.
+ * This is the default behavior.
+ */
+public class NoneConnectorClientConfigOverridePolicy extends AbstractConnectorClientConfigOverridePolicy {
+    private static final Logger log = LoggerFactory.getLogger(NoneConnectorClientConfigOverridePolicy.class);
+
+    @Override
+    protected String policyName() {
+        return "None";
+    }
+
+    @Override
+    protected boolean isAllowed(ConfigValue configValue) {
+        return false;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        log.info("Setting up None Policy for ConnectorClientConfigOverride. This will disallow any client configuration to be overridden");
+    }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicy.java b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicy.java
new file mode 100644
index 0000000..7d80d80
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicy.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.connector.policy;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.ConfigValue;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Allows all {@code sasl} configurations to be overridden via the connector configs by setting {@code client.config.policy} to
+ * {@code Principal}. This allows to set a principal per connector.
+ */
+public class PrincipalConnectorClientConfigOverridePolicy extends AbstractConnectorClientConfigOverridePolicy {
+    private static final Logger log = LoggerFactory.getLogger(PrincipalConnectorClientConfigOverridePolicy.class);
+
+    private static final Set<String> ALLOWED_CONFIG =
+        Stream.of(SaslConfigs.SASL_JAAS_CONFIG, SaslConfigs.SASL_MECHANISM, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG).
+            collect(Collectors.toSet());
+
+    @Override
+    protected String policyName() {
+        return "Principal";
+    }
+
+    @Override
+    protected boolean isAllowed(ConfigValue configValue) {
+        return ALLOWED_CONFIG.contains(configValue.name());
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        log.info("Setting up Principal policy for ConnectorClientConfigOverride. This will allow `sasl` client configuration to be "
+                 + "overridden.");
+    }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 8e7d016..e92f55e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.Config;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.ConfigKey;
@@ -23,6 +25,8 @@ import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.config.ConfigTransformer;
 import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest;
 import org.apache.kafka.connect.errors.NotFoundException;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
@@ -87,6 +91,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
     private final String kafkaClusterId;
     protected final StatusBackingStore statusBackingStore;
     protected final ConfigBackingStore configBackingStore;
+    private final ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;
 
     private Map<String, Connector> tempConnectors = new ConcurrentHashMap<>();
 
@@ -94,13 +99,15 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
                           String workerId,
                           String kafkaClusterId,
                           StatusBackingStore statusBackingStore,
-                          ConfigBackingStore configBackingStore) {
+                          ConfigBackingStore configBackingStore,
+                          ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
         this.worker = worker;
         this.worker.herder = this;
         this.workerId = workerId;
         this.kafkaClusterId = kafkaClusterId;
         this.statusBackingStore = statusBackingStore;
         this.configBackingStore = configBackingStore;
+        this.connectorClientConfigOverridePolicy = connectorClientConfigOverridePolicy;
     }
 
     @Override
@@ -280,14 +287,17 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
             throw new BadRequestException("Connector config " + connectorProps + " contains no connector type");
 
         Connector connector = getConnector(connType);
+        org.apache.kafka.connect.health.ConnectorType connectorType;
         ClassLoader savedLoader = plugins().compareAndSwapLoaders(connector);
         try {
             ConfigDef baseConfigDef;
             if (connector instanceof SourceConnector) {
                 baseConfigDef = SourceConnectorConfig.configDef();
+                connectorType = org.apache.kafka.connect.health.ConnectorType.SOURCE;
             } else {
                 baseConfigDef = SinkConnectorConfig.configDef();
                 SinkConnectorConfig.validate(connectorProps);
+                connectorType = org.apache.kafka.connect.health.ConnectorType.SINK;
             }
             ConfigDef enrichedConfigDef = ConnectorConfig.enrich(plugins(), baseConfigDef, connectorProps, false);
             Map<String, ConfigValue> validatedConnectorConfig = validateBasicConnectorConfig(
@@ -321,12 +331,105 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
             configKeys.putAll(configDef.configKeys());
             allGroups.addAll(configDef.groups());
             configValues.addAll(config.configValues());
-            return generateResult(connType, configKeys, configValues, new ArrayList<>(allGroups));
+            ConfigInfos configInfos =  generateResult(connType, configKeys, configValues, new ArrayList<>(allGroups));
+
+            AbstractConfig connectorConfig = new AbstractConfig(new ConfigDef(), connectorProps);
+            String connName = connectorProps.get(ConnectorConfig.NAME_CONFIG);
+            ConfigInfos producerConfigInfos = null;
+            ConfigInfos consumerConfigInfos = null;
+            ConfigInfos adminConfigInfos = null;
+            if (connectorType.equals(org.apache.kafka.connect.health.ConnectorType.SOURCE)) {
+                producerConfigInfos = validateClientOverrides(connName,
+                                                              ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX,
+                                                              connectorConfig,
+                                                              ProducerConfig.configDef(),
+                                                              connector.getClass(),
+                                                              connectorType,
+                                                              ConnectorClientConfigRequest.ClientType.PRODUCER,
+                                                              connectorClientConfigOverridePolicy);
+                return mergeConfigInfos(connType, configInfos, producerConfigInfos);
+            } else {
+                consumerConfigInfos = validateClientOverrides(connName,
+                                                              ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX,
+                                                              connectorConfig,
+                                                              ProducerConfig.configDef(),
+                                                              connector.getClass(),
+                                                              connectorType,
+                                                              ConnectorClientConfigRequest.ClientType.CONSUMER,
+                                                              connectorClientConfigOverridePolicy);
+                // check if topic for dead letter queue exists
+                String topic = connectorProps.get(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG);
+                if (topic != null && !topic.isEmpty()) {
+                    adminConfigInfos = validateClientOverrides(connName,
+                                                               ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX,
+                                                               connectorConfig,
+                                                               ProducerConfig.configDef(),
+                                                               connector.getClass(),
+                                                               connectorType,
+                                                               ConnectorClientConfigRequest.ClientType.ADMIN,
+                                                               connectorClientConfigOverridePolicy);
+                }
+
+            }
+            return mergeConfigInfos(connType, configInfos, producerConfigInfos, consumerConfigInfos, adminConfigInfos);
         } finally {
             Plugins.compareAndSwapLoaders(savedLoader);
         }
     }
 
+    private static ConfigInfos mergeConfigInfos(String connType, ConfigInfos... configInfosList) {
+        int errorCount = 0;
+        List<ConfigInfo> configInfoList = new LinkedList<>();
+        Set<String> groups = new LinkedHashSet<>();
+        for (ConfigInfos configInfos : configInfosList) {
+            if (configInfos != null) {
+                errorCount += configInfos.errorCount();
+                configInfoList.addAll(configInfos.values());
+                groups.addAll(configInfos.groups());
+            }
+        }
+        return new ConfigInfos(connType, errorCount, new ArrayList<>(groups), configInfoList);
+    }
+
+    private static ConfigInfos validateClientOverrides(String connName,
+                                                      String prefix,
+                                                      AbstractConfig connectorConfig,
+                                                      ConfigDef configDef,
+                                                      Class<? extends Connector> connectorClass,
+                                                      org.apache.kafka.connect.health.ConnectorType connectorType,
+                                                      ConnectorClientConfigRequest.ClientType clientType,
+                                                      ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+        int errorCount = 0;
+        List<ConfigInfo> configInfoList = new LinkedList<>();
+        Map<String, ConfigKey> configKeys = configDef.configKeys();
+        Set<String> groups = new LinkedHashSet<>();
+        Map<String, Object> clientConfigs = connectorConfig.originalsWithPrefix(prefix);
+        ConnectorClientConfigRequest connectorClientConfigRequest = new ConnectorClientConfigRequest(
+            connName, connectorType, connectorClass, clientConfigs, clientType);
+        List<ConfigValue> configValues = connectorClientConfigOverridePolicy.validate(connectorClientConfigRequest);
+        if (configValues != null) {
+            for (ConfigValue validatedConfigValue : configValues) {
+                ConfigKey configKey = configKeys.get(validatedConfigValue.name());
+                ConfigKeyInfo configKeyInfo = null;
+                if (configKey != null) {
+                    if (configKey.group != null) {
+                        groups.add(configKey.group);
+                    }
+                    configKeyInfo = convertConfigKey(configKey, prefix);
+                }
+
+                ConfigValue configValue = new ConfigValue(prefix + validatedConfigValue.name(), validatedConfigValue.value(),
+                                                          validatedConfigValue.recommendedValues(), validatedConfigValue.errorMessages());
+                if (configValue.errorMessages().size() > 0) {
+                    errorCount++;
+                }
+                ConfigValueInfo configValueInfo = convertConfigValue(configValue, configKey != null ? configKey.type : null);
+                configInfoList.add(new ConfigInfo(configKeyInfo, configValueInfo));
+            }
+        }
+        return new ConfigInfos(connectorClass.toString(), errorCount, new ArrayList<>(groups), configInfoList);
+    }
+
     // public for testing
     public static ConfigInfos generateResult(String connType, Map<String, ConfigKey> configKeys, List<ConfigValue> configValues, List<String> groups) {
         int errorCount = 0;
@@ -358,7 +461,11 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
     }
 
     private static ConfigKeyInfo convertConfigKey(ConfigKey configKey) {
-        String name = configKey.name;
+        return convertConfigKey(configKey, "");
+    }
+
+    private static ConfigKeyInfo convertConfigKey(ConfigKey configKey, String prefix) {
+        String name = prefix + configKey.name;
         Type type = configKey.type;
         String typeName = configKey.type.name();
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 8889aad..1cebc94 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -140,6 +140,11 @@ public class ConnectorConfig extends AbstractConfig {
             "a failure. This is 'false' by default, which will prevent record keys, values, and headers from being written to log files, " +
             "although some information such as topic and partition number will still be logged.";
 
+
+    public static final String CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX = "producer.overrides.";
+    public static final String CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX = "consumer.overrides.";
+    public static final String CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX = "admin.overrides.";
+
     private final EnrichedConnectorConfig enrichedConfig;
     private static class EnrichedConnectorConfig extends AbstractConfig {
         EnrichedConnectorConfig(ConfigDef configDef, Map<String, String> props) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index f6f7452..f848a18 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.common.config.provider.ConfigProvider;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Frequencies;
@@ -30,7 +31,10 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.health.ConnectorType;
 import org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
@@ -68,6 +72,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
 
 
 /**
@@ -98,15 +103,16 @@ public class Worker {
     private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>();
     private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
     private WorkerConfigTransformer workerConfigTransformer;
+    private ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;
 
     public Worker(
-            String workerId,
-            Time time,
-            Plugins plugins,
-            WorkerConfig config,
-            OffsetBackingStore offsetBackingStore
-    ) {
-        this(workerId, time, plugins, config, offsetBackingStore, Executors.newCachedThreadPool());
+        String workerId,
+        Time time,
+        Plugins plugins,
+        WorkerConfig config,
+        OffsetBackingStore offsetBackingStore,
+        ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+        this(workerId, time, plugins, config, offsetBackingStore, Executors.newCachedThreadPool(), connectorClientConfigOverridePolicy);
     }
 
     @SuppressWarnings("deprecation")
@@ -116,7 +122,8 @@ public class Worker {
             Plugins plugins,
             WorkerConfig config,
             OffsetBackingStore offsetBackingStore,
-            ExecutorService executorService
+            ExecutorService executorService,
+            ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy
     ) {
         this.metrics = new ConnectMetrics(workerId, config, time);
         this.executor = executorService;
@@ -124,6 +131,7 @@ public class Worker {
         this.time = time;
         this.plugins = plugins;
         this.config = config;
+        this.connectorClientConfigOverridePolicy = connectorClientConfigOverridePolicy;
         this.workerMetricsGroup = new WorkerMetricsGroup(metrics);
 
         // Internal converters are required properties, thus getClass won't return null.
@@ -486,7 +494,8 @@ public class Worker {
                                        HeaderConverter headerConverter,
                                        ClassLoader loader) {
         ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id);
-
+        final Class<? extends Connector> connectorClass = plugins.connectorClass(
+            connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
         RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connConfig.errorRetryTimeout(),
                 connConfig.errorMaxDelayInMillis(), connConfig.errorToleranceType(), Time.SYSTEM);
         retryWithToleranceOperator.metrics(errorHandlingMetrics);
@@ -500,7 +509,8 @@ public class Worker {
                     internalKeyConverter, internalValueConverter);
             OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
                     internalKeyConverter, internalValueConverter);
-            Map<String, Object> producerProps = producerConfigs("connector-producer-" + id, config);
+            Map<String, Object> producerProps = producerConfigs(id, "connector-producer-" + id, config, connConfig, connectorClass,
+                                                                connectorClientConfigOverridePolicy);
             KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
 
             // Note we pass the configState as it performs dynamic transformations under the covers
@@ -511,9 +521,9 @@ public class Worker {
             TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations(), retryWithToleranceOperator);
             log.info("Initializing: {}", transformationChain);
             SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings());
-            retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics));
+            retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
 
-            Map<String, Object> consumerProps = consumerConfigs(id, config);
+            Map<String, Object> consumerProps = consumerConfigs(id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy);
             KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps);
 
             return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter,
@@ -525,7 +535,12 @@ public class Worker {
         }
     }
 
-    static Map<String, Object> producerConfigs(String defaultClientId, WorkerConfig config) {
+    static Map<String, Object> producerConfigs(ConnectorTaskId id,
+                                               String defaultClientId,
+                                               WorkerConfig config,
+                                               ConnectorConfig connConfig,
+                                               Class<? extends Connector>  connectorClass,
+                                               ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
         Map<String, Object> producerProps = new HashMap<>();
         producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
@@ -540,11 +555,22 @@ public class Worker {
         producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, defaultClientId);
         // User-specified overrides
         producerProps.putAll(config.originalsWithPrefix("producer."));
+
+        // Connector-specified overrides
+        Map<String, Object> producerOverrides =
+            connectorClientConfigOverrides(id, connConfig, connectorClass, ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX,
+                                           ConnectorType.SOURCE, ConnectorClientConfigRequest.ClientType.PRODUCER,
+                                           connectorClientConfigOverridePolicy);
+        producerProps.putAll(producerOverrides);
+
         return producerProps;
     }
 
-
-    static Map<String, Object> consumerConfigs(ConnectorTaskId id, WorkerConfig config) {
+    static Map<String, Object> consumerConfigs(ConnectorTaskId id,
+                                               WorkerConfig config,
+                                               ConnectorConfig connConfig,
+                                               Class<? extends Connector> connectorClass,
+                                               ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
         // Include any unknown worker configs so consumer configs can be set globally on the worker
         // and through to the task
         Map<String, Object> consumerProps = new HashMap<>();
@@ -559,15 +585,68 @@ public class Worker {
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
 
         consumerProps.putAll(config.originalsWithPrefix("consumer."));
+        // Connector-specified overrides
+        Map<String, Object> consumerOverrides =
+            connectorClientConfigOverrides(id, connConfig, connectorClass, ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX,
+                                           ConnectorType.SINK, ConnectorClientConfigRequest.ClientType.CONSUMER,
+                                           connectorClientConfigOverridePolicy);
+        consumerProps.putAll(consumerOverrides);
+
         return consumerProps;
     }
 
+    static Map<String, Object> adminConfigs(ConnectorTaskId id,
+                                            WorkerConfig config,
+                                            ConnectorConfig connConfig,
+                                            Class<? extends Connector> connectorClass,
+                                            ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+        Map<String, Object> adminProps = new HashMap<>();
+        adminProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
+        // User-specified overrides
+        adminProps.putAll(config.originalsWithPrefix("admin."));
+
+        // Connector-specified overrides
+        Map<String, Object> adminOverrides =
+            connectorClientConfigOverrides(id, connConfig, connectorClass, ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX,
+                                           ConnectorType.SINK, ConnectorClientConfigRequest.ClientType.ADMIN,
+                                           connectorClientConfigOverridePolicy);
+        adminProps.putAll(adminOverrides);
+
+        return adminProps;
+    }
+
+    private static Map<String, Object> connectorClientConfigOverrides(ConnectorTaskId id,
+                                                                      ConnectorConfig connConfig,
+                                                                      Class<? extends Connector> connectorClass,
+                                                                      String clientConfigPrefix,
+                                                                      ConnectorType connectorType,
+                                                                      ConnectorClientConfigRequest.ClientType clientType,
+                                                                      ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+        Map<String, Object> clientOverrides = connConfig.originalsWithPrefix(clientConfigPrefix);
+        ConnectorClientConfigRequest connectorClientConfigRequest = new ConnectorClientConfigRequest(
+            id.connector(),
+            connectorType,
+            connectorClass,
+            clientOverrides,
+            clientType
+        );
+        List<ConfigValue> configValues = connectorClientConfigOverridePolicy.validate(connectorClientConfigRequest);
+        List<ConfigValue> errorConfigs = configValues.stream().
+            filter(configValue -> configValue.errorMessages().size() > 0).collect(Collectors.toList());
+        // These should be caught when the herder validates the connector configuration, but just in case
+        if (errorConfigs.size() > 0) {
+            throw new ConnectException("Client Config Overrides not allowed " + errorConfigs);
+        }
+        return clientOverrides;
+    }
+
     ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) {
         return new ErrorHandlingMetrics(id, metrics);
     }
 
     private List<ErrorReporter> sinkTaskReporters(ConnectorTaskId id, SinkConnectorConfig connConfig,
-                                                  ErrorHandlingMetrics errorHandlingMetrics) {
+                                                  ErrorHandlingMetrics errorHandlingMetrics,
+                                                  Class<? extends Connector> connectorClass) {
         ArrayList<ErrorReporter> reporters = new ArrayList<>();
         LogReporter logReporter = new LogReporter(id, connConfig, errorHandlingMetrics);
         reporters.add(logReporter);
@@ -575,8 +654,10 @@ public class Worker {
         // check if topic for dead letter queue exists
         String topic = connConfig.dlqTopicName();
         if (topic != null && !topic.isEmpty()) {
-            Map<String, Object> producerProps = producerConfigs("connector-dlq-producer-" + id, config);
-            DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(config, id, connConfig, producerProps, errorHandlingMetrics);
+            Map<String, Object> producerProps = producerConfigs(id, "connector-dlq-producer-" + id, config, connConfig, connectorClass,
+                                                                connectorClientConfigOverridePolicy);
+            Map<String, Object> adminProps = adminConfigs(id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy);
+            DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(adminProps, id, connConfig, producerProps, errorHandlingMetrics);
             reporters.add(reporter);
         }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index efa92b5..773358f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -212,6 +212,14 @@ public class WorkerConfig extends AbstractConfig {
             + "<code>ConnectRestExtension</code> allows you to inject into Connect's REST API user defined resources like filters. "
             + "Typically used to add custom capability like logging, security, etc. ";
 
+    public static final String CONNECTOR_CLIENT_POLICY_CLASS_CONFIG = "client.config.policy";
+    public static final String CONNECTOR_CLIENT_POLICY_CLASS_DOC =
+        "Class name or alias of implementation of <code>ConnectorClientConfigOverridePolicy</code>. Defines what client configurations can be "
+        + "overriden by the connector. The default implementation is `None`. The other possible policies in the framework include `All` "
+        + "and `Principal`. ";
+    public static final String CONNECTOR_CLIENT_POLICY_CLASS_DEFAULT = "None";
+
+
     public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
     public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
     public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;
@@ -289,7 +297,9 @@ public class WorkerConfig extends AbstractConfig {
                         Collections.emptyList(),
                         Importance.LOW, CONFIG_PROVIDERS_DOC)
                 .define(REST_EXTENSION_CLASSES_CONFIG, Type.LIST, "",
-                        Importance.LOW, REST_EXTENSION_CLASSES_DOC);
+                        Importance.LOW, REST_EXTENSION_CLASSES_DOC)
+                .define(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, Type.STRING, CONNECTOR_CLIENT_POLICY_CLASS_DEFAULT,
+                        Importance.MEDIUM, CONNECTOR_CLIENT_POLICY_CLASS_DOC);
     }
 
     private void logInternalConverterDeprecationWarnings(Map<String, String> props) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 94ca734..585836e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.errors.AlreadyExistsException;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.NotFoundException;
@@ -165,8 +166,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                              String kafkaClusterId,
                              StatusBackingStore statusBackingStore,
                              ConfigBackingStore configBackingStore,
-                             String restUrl) {
-        this(config, worker, worker.workerId(), kafkaClusterId, statusBackingStore, configBackingStore, null, restUrl, worker.metrics(), time);
+                             String restUrl,
+                             ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+        this(config, worker, worker.workerId(), kafkaClusterId, statusBackingStore, configBackingStore, null, restUrl, worker.metrics(),
+             time, connectorClientConfigOverridePolicy);
         configBackingStore.setUpdateListener(new ConfigUpdateListener());
     }
 
@@ -180,8 +183,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                       WorkerGroupMember member,
                       String restUrl,
                       ConnectMetrics metrics,
-                      Time time) {
-        super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore);
+                      Time time,
+                      ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+        super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore, connectorClientConfigOverridePolicy);
 
         this.time = time;
         this.herderMetrics = new HerderMetrics(metrics);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
index 2312269..c78026f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.SinkConnectorConfig;
-import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,13 +70,13 @@ public class DeadLetterQueueReporter implements ErrorReporter {
 
     private KafkaProducer<byte[], byte[]> kafkaProducer;
 
-    public static DeadLetterQueueReporter createAndSetup(WorkerConfig workerConfig,
+    public static DeadLetterQueueReporter createAndSetup(Map<String, Object> adminProps,
                                                          ConnectorTaskId id,
                                                          SinkConnectorConfig sinkConfig, Map<String, Object> producerProps,
                                                          ErrorHandlingMetrics errorHandlingMetrics) {
         String topic = sinkConfig.dlqTopicName();
 
-        try (AdminClient admin = AdminClient.create(workerConfig.originals())) {
+        try (AdminClient admin = AdminClient.create(adminProps)) {
             if (!admin.listTopics().names().get().contains(topic)) {
                 log.error("Topic {} doesn't exist. Will attempt to create topic.", topic);
                 NewTopic schemaTopicRequest = new NewTopic(topic, DLQ_NUM_DESIRED_PARTITIONS, sinkConfig.dlqTopicReplicationFactor());
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index 460df39..d8c4cca 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime.isolation;
 import org.apache.kafka.common.config.provider.ConfigProvider;
 import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.HeaderConverter;
@@ -72,6 +73,7 @@ public class DelegatingClassLoader extends URLClassLoader {
     private final SortedSet<PluginDesc<Transformation>> transformations;
     private final SortedSet<PluginDesc<ConfigProvider>> configProviders;
     private final SortedSet<PluginDesc<ConnectRestExtension>> restExtensions;
+    private final SortedSet<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies;
     private final List<String> pluginPaths;
 
     private static final String MANIFEST_PREFIX = "META-INF/services/";
@@ -91,6 +93,7 @@ public class DelegatingClassLoader extends URLClassLoader {
         this.transformations = new TreeSet<>();
         this.configProviders = new TreeSet<>();
         this.restExtensions = new TreeSet<>();
+        this.connectorClientConfigPolicies = new TreeSet<>();
     }
 
     public DelegatingClassLoader(List<String> pluginPaths) {
@@ -125,6 +128,10 @@ public class DelegatingClassLoader extends URLClassLoader {
         return restExtensions;
     }
 
+    public Set<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies() {
+        return connectorClientConfigPolicies;
+    }
+
     public ClassLoader connectorLoader(Connector connector) {
         return connectorLoader(connector.getClass().getName());
     }
@@ -249,6 +256,8 @@ public class DelegatingClassLoader extends URLClassLoader {
             configProviders.addAll(plugins.configProviders());
             addPlugins(plugins.restExtensions(), loader);
             restExtensions.addAll(plugins.restExtensions());
+            addPlugins(plugins.connectorClientConfigPolicies(), loader);
+            connectorClientConfigPolicies.addAll(plugins.connectorClientConfigPolicies());
         }
 
         loadJdbcDrivers(loader);
@@ -304,7 +313,8 @@ public class DelegatingClassLoader extends URLClassLoader {
                 getPluginDesc(reflections, HeaderConverter.class, loader),
                 getPluginDesc(reflections, Transformation.class, loader),
                 getServiceLoaderPluginDesc(ConfigProvider.class, loader),
-                getServiceLoaderPluginDesc(ConnectRestExtension.class, loader)
+                getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
+                getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
         );
     }
 
@@ -371,6 +381,7 @@ public class DelegatingClassLoader extends URLClassLoader {
         addAliases(headerConverters);
         addAliases(transformations);
         addAliases(restExtensions);
+        addAliases(connectorClientConfigPolicies);
     }
 
     private <S> void addAliases(Collection<PluginDesc<S>> plugins) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
index ef077b3..e64a96c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
@@ -18,12 +18,15 @@ package org.apache.kafka.connect.runtime.isolation;
 
 import org.apache.kafka.common.config.provider.ConfigProvider;
 import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.transforms.Transformation;
 
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 
 public class PluginScanResult {
     private final Collection<PluginDesc<Connector>> connectors;
@@ -32,6 +35,9 @@ public class PluginScanResult {
     private final Collection<PluginDesc<Transformation>> transformations;
     private final Collection<PluginDesc<ConfigProvider>> configProviders;
     private final Collection<PluginDesc<ConnectRestExtension>> restExtensions;
+    private final Collection<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies;
+
+    private final List<Collection> allPlugins;
 
     public PluginScanResult(
             Collection<PluginDesc<Connector>> connectors,
@@ -39,7 +45,8 @@ public class PluginScanResult {
             Collection<PluginDesc<HeaderConverter>> headerConverters,
             Collection<PluginDesc<Transformation>> transformations,
             Collection<PluginDesc<ConfigProvider>> configProviders,
-            Collection<PluginDesc<ConnectRestExtension>> restExtensions
+            Collection<PluginDesc<ConnectRestExtension>> restExtensions,
+            Collection<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies
     ) {
         this.connectors = connectors;
         this.converters = converters;
@@ -47,6 +54,10 @@ public class PluginScanResult {
         this.transformations = transformations;
         this.configProviders = configProviders;
         this.restExtensions = restExtensions;
+        this.connectorClientConfigPolicies = connectorClientConfigPolicies;
+        this.allPlugins =
+            Arrays.asList(connectors, converters, headerConverters, transformations, configProviders,
+                          connectorClientConfigPolicies);
     }
 
     public Collection<PluginDesc<Connector>> connectors() {
@@ -73,12 +84,15 @@ public class PluginScanResult {
         return restExtensions;
     }
 
+    public Collection<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies() {
+        return connectorClientConfigPolicies;
+    }
+
     public boolean isEmpty() {
-        return connectors().isEmpty()
-               && converters().isEmpty()
-               && headerConverters().isEmpty()
-               && transformations().isEmpty()
-               && configProviders().isEmpty()
-               && restExtensions().isEmpty();
+        boolean isEmpty = true;
+        for (Collection plugins : allPlugins) {
+            isEmpty = isEmpty && plugins.isEmpty();
+        }
+        return isEmpty;
     }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
index 2833b4c..8b42f59 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.isolation;
 
 import org.apache.kafka.common.config.provider.ConfigProvider;
 import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.source.SourceConnector;
@@ -34,6 +35,7 @@ public enum PluginType {
     TRANSFORMATION(Transformation.class),
     CONFIGPROVIDER(ConfigProvider.class),
     REST_EXTENSION(ConnectRestExtension.class),
+    CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY(ConnectorClientConfigOverridePolicy.class),
     UNKNOWN(Object.class);
 
     private Class<?> klass;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index 8d2a3ce..c029802 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -131,6 +131,7 @@ public class PluginUtils {
             + "|storage\\.StringConverter"
             + "|storage\\.SimpleHeaderConverter"
             + "|rest\\.basic\\.auth\\.extension\\.BasicAuthSecurityRestExtension"
+            + "|connector\\.policy\\..*"
             + ")"
             + "|common\\.config\\.provider\\.(?!ConfigProvider$).*"
             + ")$");
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 148f818..e438db8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -149,6 +149,11 @@ public class Plugins {
     }
 
     public Connector newConnector(String connectorClassOrAlias) {
+        Class<? extends Connector> klass = connectorClass(connectorClassOrAlias);
+        return newPlugin(klass);
+    }
+
+    public Class<? extends Connector> connectorClass(String connectorClassOrAlias) {
         Class<? extends Connector> klass;
         try {
             klass = pluginClass(
@@ -188,7 +193,7 @@ public class Plugins {
             PluginDesc<Connector> entry = matches.get(0);
             klass = entry.pluginClass();
         }
-        return newPlugin(klass);
+        return klass;
     }
 
     public Task newTask(Class<? extends Task> taskClass) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 7b6d16a..00bda92 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime.standalone;
 
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.errors.AlreadyExistsException;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.NotFoundException;
@@ -62,12 +63,14 @@ public class StandaloneHerder extends AbstractHerder {
 
     private ClusterConfigState configState;
 
-    public StandaloneHerder(Worker worker, String kafkaClusterId) {
+    public StandaloneHerder(Worker worker, String kafkaClusterId,
+                            ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
         this(worker,
                 worker.workerId(),
                 kafkaClusterId,
                 new MemoryStatusBackingStore(),
-                new MemoryConfigBackingStore(worker.configTransformer()));
+                new MemoryConfigBackingStore(worker.configTransformer()),
+             connectorClientConfigOverridePolicy);
     }
 
     // visible for testing
@@ -75,8 +78,9 @@ public class StandaloneHerder extends AbstractHerder {
                      String workerId,
                      String kafkaClusterId,
                      StatusBackingStore statusBackingStore,
-                     MemoryConfigBackingStore configBackingStore) {
-        super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore);
+                     MemoryConfigBackingStore configBackingStore,
+                     ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+        super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore, connectorClientConfigOverridePolicy);
         this.configState = ClusterConfigState.EMPTY;
         this.requestExecutorService = Executors.newSingleThreadScheduledExecutor();
         configBackingStore.setUpdateListener(new ConfigUpdateListener());
diff --git a/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy
new file mode 100644
index 0000000..8b76ce4
--- /dev/null
+++ b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy
@@ -0,0 +1,18 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy
+org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy
+org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy
\ No newline at end of file
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/BaseConnectorClientConfigOverridePolicyTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/BaseConnectorClientConfigOverridePolicyTest.java
new file mode 100644
index 0000000..28fee73
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/BaseConnectorClientConfigOverridePolicyTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.connector.policy;
+
+import org.apache.kafka.common.config.ConfigValue;
+import org.apache.kafka.connect.health.ConnectorType;
+import org.apache.kafka.connect.runtime.WorkerTest;
+import org.junit.Assert;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class BaseConnectorClientConfigOverridePolicyTest {
+
+    protected abstract ConnectorClientConfigOverridePolicy  policyToTest();
+
+    protected void testValidOverride(Map<String, Object> clientConfig) {
+        List<ConfigValue> configValues = configValues(clientConfig);
+        assertNoError(configValues);
+    }
+
+    protected void testInvalidOverride(Map<String, Object> clientConfig) {
+        List<ConfigValue> configValues = configValues(clientConfig);
+        assertError(configValues);
+    }
+
+    private List<ConfigValue> configValues(Map<String, Object> clientConfig) {
+        ConnectorClientConfigRequest connectorClientConfigRequest = new ConnectorClientConfigRequest(
+            "test",
+            ConnectorType.SOURCE,
+            WorkerTest.WorkerTestConnector.class,
+            clientConfig,
+            ConnectorClientConfigRequest.ClientType.PRODUCER);
+        return policyToTest().validate(connectorClientConfigRequest);
+    }
+
+    protected void assertNoError(List<ConfigValue> configValues) {
+        Assert.assertTrue(configValues.stream().allMatch(configValue -> configValue.errorMessages().size() == 0));
+    }
+
+    protected void assertError(List<ConfigValue> configValues) {
+        Assert.assertTrue(configValues.stream().anyMatch(configValue -> configValue.errorMessages().size() > 0));
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/NoneConnectorClientConfigOverridePolicyTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/NoneConnectorClientConfigOverridePolicyTest.java
new file mode 100644
index 0000000..2c7b078
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/NoneConnectorClientConfigOverridePolicyTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.connector.policy;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class NoneConnectorClientConfigOverridePolicyTest extends BaseConnectorClientConfigOverridePolicyTest {
+
+    ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy();
+
+    @Test
+    public void testNoOverrides() {
+        testValidOverride(Collections.emptyMap());
+    }
+
+    @Test
+    public void testWithOverrides() {
+        Map<String, Object> clientConfig = new HashMap<>();
+        clientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, "test");
+        clientConfig.put(ProducerConfig.ACKS_CONFIG, "none");
+        testInvalidOverride(clientConfig);
+    }
+
+    @Override
+    protected ConnectorClientConfigOverridePolicy policyToTest() {
+        return noneConnectorClientConfigOverridePolicy;
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicyTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicyTest.java
new file mode 100644
index 0000000..0e79c8a
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicyTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.connector.policy;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PrincipalConnectorClientConfigOverridePolicyTest extends BaseConnectorClientConfigOverridePolicyTest {
+
+    ConnectorClientConfigOverridePolicy principalConnectorClientConfigOverridePolicy = new PrincipalConnectorClientConfigOverridePolicy();
+
+    @Test
+    public void testPrincipalOnly() {
+        Map<String, Object> clientConfig = Collections.singletonMap(SaslConfigs.SASL_JAAS_CONFIG, "test");
+        testValidOverride(clientConfig);
+    }
+
+    @Test
+    public void testPrincipalPlusOtherConfigs() {
+        Map<String, Object> clientConfig = new HashMap<>();
+        clientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, "test");
+        clientConfig.put(ProducerConfig.ACKS_CONFIG, "none");
+        testInvalidOverride(clientConfig);
+    }
+
+    @Override
+    protected ConnectorClientConfigOverridePolicy policyToTest() {
+        return principalConnectorClientConfigOverridePolicy;
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorCientPolicyIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorCientPolicyIntegrationTest.java
new file mode 100644
index 0000000..499916b
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorCientPolicyIntegrationTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@Category(IntegrationTest.class)
+public class ConnectorCientPolicyIntegrationTest {
+
+    private static final int NUM_TASKS = 1;
+    private static final int NUM_WORKERS = 1;
+    private static final String CONNECTOR_NAME = "simple-conn";
+
+
+    @After
+    public void close() {
+    }
+
+    @Test
+    public void testCreateWithOverridesForNonePolicy() throws Exception {
+        Map<String, String> props = basicConnectorConfig();
+        props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + SaslConfigs.SASL_JAAS_CONFIG, "sasl");
+        assertFailCreateConnector("None", props);
+    }
+
+    @Test
+    public void testCreateWithNotAllowedOverridesForPrincipalPolicy() throws Exception {
+        Map<String, String> props = basicConnectorConfig();
+        props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + SaslConfigs.SASL_JAAS_CONFIG, "sasl");
+        props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+        assertFailCreateConnector("Principal", props);
+    }
+
+    @Test
+    public void testCreateWithAllowedOverridesForPrincipalPolicy() throws Exception {
+        Map<String, String> props = basicConnectorConfig();
+        props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAIN");
+        assertPassCreateConnector("Principal", props);
+    }
+
+    @Test
+    public void testCreateWithAllowedOverridesForAllPolicy() throws Exception {
+        // setup up props for the sink connector
+        Map<String, String> props = basicConnectorConfig();
+        props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.CLIENT_ID_CONFIG, "test");
+        assertPassCreateConnector("All", props);
+    }
+
+    private EmbeddedConnectCluster connectClusterWithPolicy(String policy) throws IOException {
+        // setup Connect worker properties
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000));
+        workerProps.put(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, policy);
+
+        // setup Kafka broker properties
+        Properties exampleBrokerProps = new Properties();
+        exampleBrokerProps.put("auto.create.topics.enable", "false");
+
+        // build a Connect cluster backed by Kafka and Zk
+        EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder()
+            .name("connect-cluster")
+            .numWorkers(NUM_WORKERS)
+            .numBrokers(1)
+            .workerProps(workerProps)
+            .brokerProps(exampleBrokerProps)
+            .build();
+
+        // start the clusters
+        connect.start();
+        return connect;
+    }
+
+    private void assertFailCreateConnector(String policy, Map<String, String> props) throws IOException {
+        EmbeddedConnectCluster connect = connectClusterWithPolicy(policy);
+        try {
+            connect.configureConnector(CONNECTOR_NAME, props);
+            fail("Shouldn't be able to create connector");
+        } catch (ConnectRestException e) {
+            assertEquals(e.statusCode(), 400);
+        } finally {
+            connect.stop();
+        }
+    }
+
+    private void assertPassCreateConnector(String policy, Map<String, String> props) throws IOException {
+        EmbeddedConnectCluster connect = connectClusterWithPolicy(policy);
+        try {
+            connect.configureConnector(CONNECTOR_NAME, props);
+        } catch (ConnectRestException e) {
+            fail("Should be able to create connector");
+        } finally {
+            connect.stop();
+        }
+    }
+
+
+    public Map<String, String> basicConnectorConfig() {
+        Map<String, String> props = new HashMap<>();
+        props.put(CONNECTOR_CLASS_CONFIG, MonitorableSinkConnector.class.getSimpleName());
+        props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+        props.put(TOPICS_CONFIG, "test-topic");
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        return props;
+    }
+
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index f7ee8a6..35c0dd2 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -16,10 +16,15 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
@@ -116,6 +121,7 @@ public class AbstractHerderTest {
     private final String kafkaClusterId = "I4ZmrWqfT2e-upky_4fdPA";
     private final int generation = 5;
     private final String connector = "connector";
+    private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy();
 
     @MockStrict private Worker worker;
     @MockStrict private WorkerConfigTransformer transformer;
@@ -132,9 +138,10 @@ public class AbstractHerderTest {
                 String.class,
                 String.class,
                 StatusBackingStore.class,
-                ConfigBackingStore.class
+                ConfigBackingStore.class,
+                ConnectorClientConfigOverridePolicy.class
             )
-            .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore)
+            .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
             .addMockedMethod("generation")
             .createMock();
 
@@ -155,9 +162,10 @@ public class AbstractHerderTest {
                 String.class,
                 String.class,
                 StatusBackingStore.class,
-                ConfigBackingStore.class
+                ConfigBackingStore.class,
+                ConnectorClientConfigOverridePolicy.class
             )
-            .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore)
+            .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
             .addMockedMethod("generation")
             .createMock();
 
@@ -179,8 +187,9 @@ public class AbstractHerderTest {
         ConnectorTaskId taskId = new ConnectorTaskId(connector, 0);
 
         AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-                .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
-                .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore)
+                .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class,
+                                 ConnectorClientConfigOverridePolicy.class)
+                .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
                 .addMockedMethod("generation")
                 .createMock();
 
@@ -219,8 +228,9 @@ public class AbstractHerderTest {
         String workerId = "workerId";
 
         AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-                .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
-                .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore)
+                .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class,
+                                 ConnectorClientConfigOverridePolicy.class)
+                .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
                 .addMockedMethod("generation")
                 .createMock();
 
@@ -253,7 +263,7 @@ public class AbstractHerderTest {
 
     @Test(expected = BadRequestException.class)
     public void testConfigValidationEmptyConfig() {
-        AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class);
+        AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class, noneConnectorClientConfigOverridePolicy);
         replayAll();
 
         herder.validateConnectorConfig(new HashMap<String, String>());
@@ -263,7 +273,7 @@ public class AbstractHerderTest {
 
     @Test()
     public void testConfigValidationMissingName() {
-        AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class);
+        AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class, noneConnectorClientConfigOverridePolicy);
         replayAll();
 
         Map<String, String> config = Collections.singletonMap(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSourceConnector.class.getName());
@@ -288,7 +298,7 @@ public class AbstractHerderTest {
 
     @Test(expected = ConfigException.class)
     public void testConfigValidationInvalidTopics() {
-        AbstractHerder herder = createConfigValidationHerder(TestSinkConnector.class);
+        AbstractHerder herder = createConfigValidationHerder(TestSinkConnector.class, noneConnectorClientConfigOverridePolicy);
         replayAll();
 
         Map<String, String> config = new HashMap<>();
@@ -303,7 +313,7 @@ public class AbstractHerderTest {
 
     @Test()
     public void testConfigValidationTransformsExtendResults() {
-        AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class);
+        AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class, noneConnectorClientConfigOverridePolicy);
 
         // 2 transform aliases defined -> 2 plugin lookups
         Set<PluginDesc<Transformation>> transformations = new HashSet<>();
@@ -349,6 +359,46 @@ public class AbstractHerderTest {
         verifyAll();
     }
 
+    @Test()
+    public void testConfigValidationPrincipalOnlyOverride() {
+        AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class, new PrincipalConnectorClientConfigOverridePolicy());
+        replayAll();
+
+        // Define 2 transformations. One has a class defined and so can get embedded configs, the other is missing
+        // class info that should generate an error.
+        Map<String, String> config = new HashMap<>();
+        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSourceConnector.class.getName());
+        config.put(ConnectorConfig.NAME_CONFIG, "connector-name");
+        config.put("required", "value"); // connector required config
+        String ackConfigKey = ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + ProducerConfig.ACKS_CONFIG;
+        String saslConfigKey = ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + SaslConfigs.SASL_JAAS_CONFIG;
+        config.put(ackConfigKey, "none");
+        config.put(saslConfigKey, "jaas_config");
+
+        ConfigInfos result = herder.validateConnectorConfig(config);
+        assertEquals(herder.connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)), ConnectorType.SOURCE);
+
+        // We expect there to be errors due to now allowed override policy for ACKS.... Note that these assertions depend heavily on
+        // the config fields for SourceConnectorConfig, but we expect these to change rarely.
+        assertEquals(TestSourceConnector.class.getName(), result.name());
+        // Each transform also gets its own group
+        List<String> expectedGroups = Arrays.asList(
+            ConnectorConfig.COMMON_GROUP,
+            ConnectorConfig.TRANSFORMS_GROUP,
+            ConnectorConfig.ERROR_GROUP
+        );
+        assertEquals(expectedGroups, result.groups());
+        assertEquals(1, result.errorCount());
+        // Base connector config has 13 fields, connector's configs add 2, and 2 producer overrides
+        assertEquals(17, result.values().size());
+        assertTrue(result.values().stream().anyMatch(
+            configInfo -> ackConfigKey.equals(configInfo.configValue().name()) && !configInfo.configValue().errors().isEmpty()));
+        assertTrue(result.values().stream().anyMatch(
+            configInfo -> saslConfigKey.equals(configInfo.configValue().name()) && configInfo.configValue().errors().isEmpty()));
+
+        verifyAll();
+    }
+
     @Test
     public void testReverseTransformConfigs() {
         // Construct a task config with constant values for TEST_KEY and TEST_KEY2
@@ -372,15 +422,17 @@ public class AbstractHerderTest {
         assertFalse(reverseTransformed.get(0).containsKey(TEST_KEY3));
     }
 
-    private AbstractHerder createConfigValidationHerder(Class<? extends Connector> connectorClass) {
+    private AbstractHerder createConfigValidationHerder(Class<? extends Connector> connectorClass,
+                                                        ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
 
 
         ConfigBackingStore configStore = strictMock(ConfigBackingStore.class);
         StatusBackingStore statusStore = strictMock(StatusBackingStore.class);
 
         AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-                .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
-                .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore)
+                .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class,
+                                 ConnectorClientConfigOverridePolicy.class)
+                .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, connectorClientConfigOverridePolicy)
                 .addMockedMethod("generation")
                 .createMock();
         EasyMock.expect(herder.generation()).andStubReturn(generation);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 586587b..9cb83eb 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -29,6 +29,9 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.ConnectException;
@@ -60,6 +63,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.easymock.PowerMock;
 import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.api.easymock.annotation.MockNice;
 import org.powermock.api.easymock.annotation.MockStrict;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -89,6 +93,8 @@ public class WorkerTest extends ThreadedTest {
     private static final String CONNECTOR_ID = "test-connector";
     private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 0);
     private static final String WORKER_ID = "localhost:8083";
+    private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy();
+    private final ConnectorClientConfigOverridePolicy allConnectorClientConfigOverridePolicy = new AllConnectorClientConfigOverridePolicy();
 
     private Map<String, String> workerProps = new HashMap<>();
     private WorkerConfig config;
@@ -120,6 +126,7 @@ public class WorkerTest extends ThreadedTest {
     @Mock private Converter taskValueConverter;
     @Mock private HeaderConverter taskHeaderConverter;
     @Mock private ExecutorService executorService;
+    @MockNice private ConnectorConfig connectorConfig;
 
     @Before
     public void setup() {
@@ -200,7 +207,7 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
         worker.start();
 
         assertEquals(Collections.emptySet(), worker.connectorNames());
@@ -251,7 +258,7 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
         worker.start();
 
         assertStatistics(worker, 0, 0);
@@ -311,7 +318,7 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
         worker.start();
 
         assertStatistics(worker, 0, 0);
@@ -375,7 +382,7 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
         worker.start();
 
         assertStatistics(worker, 0, 0);
@@ -401,7 +408,7 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
         worker.start();
 
         worker.stopConnector(CONNECTOR_ID);
@@ -458,7 +465,7 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
         worker.start();
 
         assertStatistics(worker, 0, 0);
@@ -550,7 +557,6 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader);
         EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName()))
                 .andReturn(pluginLoader);
-
         EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader)).andReturn(delegatingLoader)
                 .times(2);
 
@@ -558,7 +564,8 @@ public class WorkerTest extends ThreadedTest {
 
         EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader)
                 .times(2);
-
+        plugins.connectorClass(WorkerTestConnector.class.getName());
+        EasyMock.expectLastCall().andReturn(WorkerTestConnector.class);
         // Remove
         workerTask.stop();
         EasyMock.expectLastCall();
@@ -569,7 +576,8 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService,
+                            noneConnectorClientConfigOverridePolicy);
         worker.start();
         assertStatistics(worker, 0, 0);
         assertStartupStatistics(worker, 0, 0, 0, 0);
@@ -620,7 +628,7 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
         worker.start();
         assertStatistics(worker, 0, 0);
         assertStartupStatistics(worker, 0, 0, 0, 0);
@@ -699,7 +707,8 @@ public class WorkerTest extends ThreadedTest {
 
         EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader)
                 .times(2);
-
+        plugins.connectorClass(WorkerTestConnector.class.getName());
+        EasyMock.expectLastCall().andReturn(WorkerTestConnector.class);
         // Remove on Worker.stop()
         workerTask.stop();
         EasyMock.expectLastCall();
@@ -712,7 +721,8 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService,
+                            noneConnectorClientConfigOverridePolicy);
         worker.start();
         assertStatistics(worker, 0, 0);
         worker.startTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
@@ -791,6 +801,8 @@ public class WorkerTest extends ThreadedTest {
 
         EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader)
                 .times(2);
+        plugins.connectorClass(WorkerTestConnector.class.getName());
+        EasyMock.expectLastCall().andReturn(WorkerTestConnector.class);
 
         // Remove
         workerTask.stop();
@@ -802,7 +814,8 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService,
+                            noneConnectorClientConfigOverridePolicy);
         worker.start();
         assertStatistics(worker, 0, 0);
         assertEquals(Collections.emptySet(), worker.taskIds());
@@ -828,9 +841,13 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testProducerConfigsWithoutOverrides() {
+        EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX)).andReturn(
+            new HashMap<String, Object>());
+        PowerMock.replayAll();
         Map<String, String> expectedConfigs = new HashMap<>(defaultProducerConfigs);
         expectedConfigs.put("client.id", "connector-producer-job-0");
-        assertEquals(expectedConfigs, Worker.producerConfigs("connector-producer-" + TASK_ID, config));
+        assertEquals(expectedConfigs,
+                     Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, config, connectorConfig, null, noneConnectorClientConfigOverridePolicy));
     }
 
     @Test
@@ -845,7 +862,34 @@ public class WorkerTest extends ThreadedTest {
         expectedConfigs.put("acks", "-1");
         expectedConfigs.put("linger.ms", "1000");
         expectedConfigs.put("client.id", "producer-test-id");
-        assertEquals(expectedConfigs, Worker.producerConfigs("connector-producer-" + TASK_ID, configWithOverrides));
+        EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX)).andReturn(
+            new HashMap<String, Object>());
+        PowerMock.replayAll();
+        assertEquals(expectedConfigs,
+                     Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, configWithOverrides, connectorConfig, null, allConnectorClientConfigOverridePolicy));
+    }
+
+    @Test
+    public void testProducerConfigsWithClientOverrides() {
+        Map<String, String> props = new HashMap<>(workerProps);
+        props.put("producer.acks", "-1");
+        props.put("producer.linger.ms", "1000");
+        props.put("producer.client.id", "producer-test-id");
+        WorkerConfig configWithOverrides = new StandaloneConfig(props);
+
+        Map<String, String> expectedConfigs = new HashMap<>(defaultProducerConfigs);
+        expectedConfigs.put("acks", "-1");
+        expectedConfigs.put("linger.ms", "5000");
+        expectedConfigs.put("batch.size", "1000");
+        expectedConfigs.put("client.id", "producer-test-id");
+        Map<String, Object> connConfig = new HashMap<String, Object>();
+        connConfig.put("linger.ms", "5000");
+        connConfig.put("batch.size", "1000");
+        EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX))
+            .andReturn(connConfig);
+        PowerMock.replayAll();
+        assertEquals(expectedConfigs,
+                     Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, configWithOverrides, connectorConfig, null, allConnectorClientConfigOverridePolicy));
     }
 
     @Test
@@ -853,7 +897,10 @@ public class WorkerTest extends ThreadedTest {
         Map<String, String> expectedConfigs = new HashMap<>(defaultConsumerConfigs);
         expectedConfigs.put("group.id", "connect-test");
         expectedConfigs.put("client.id", "connector-consumer-test-1");
-        assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), config));
+        EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).andReturn(new HashMap<>());
+        PowerMock.replayAll();
+        assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), config, connectorConfig,
+                                                             null, noneConnectorClientConfigOverridePolicy));
     }
 
     @Test
@@ -869,9 +916,95 @@ public class WorkerTest extends ThreadedTest {
         expectedConfigs.put("auto.offset.reset", "latest");
         expectedConfigs.put("max.poll.records", "1000");
         expectedConfigs.put("client.id", "consumer-test-id");
-        assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), configWithOverrides));
+        EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).andReturn(new HashMap<>());
+        PowerMock.replayAll();
+        assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
+                                                             null, noneConnectorClientConfigOverridePolicy));
+
     }
 
+    @Test
+    public void testConsumerConfigsWithClientOverrides() {
+        Map<String, String> props = new HashMap<>(workerProps);
+        props.put("consumer.auto.offset.reset", "latest");
+        props.put("consumer.max.poll.records", "5000");
+        WorkerConfig configWithOverrides = new StandaloneConfig(props);
+
+        Map<String, String> expectedConfigs = new HashMap<>(defaultConsumerConfigs);
+        expectedConfigs.put("group.id", "connect-test");
+        expectedConfigs.put("auto.offset.reset", "latest");
+        expectedConfigs.put("max.poll.records", "5000");
+        expectedConfigs.put("max.poll.interval.ms", "1000");
+        expectedConfigs.put("client.id", "connector-consumer-test-1");
+        Map<String, Object> connConfig = new HashMap<String, Object>();
+        connConfig.put("max.poll.records", "5000");
+        connConfig.put("max.poll.interval.ms", "1000");
+        EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX))
+            .andReturn(connConfig);
+        PowerMock.replayAll();
+        assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
+                                                             null, allConnectorClientConfigOverridePolicy));
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testConsumerConfigsClientOverridesWithNonePolicy() {
+        Map<String, String> props = new HashMap<>(workerProps);
+        props.put("consumer.auto.offset.reset", "latest");
+        props.put("consumer.max.poll.records", "5000");
+        WorkerConfig configWithOverrides = new StandaloneConfig(props);
+
+        Map<String, Object> connConfig = new HashMap<String, Object>();
+        connConfig.put("max.poll.records", "5000");
+        connConfig.put("max.poll.interval.ms", "1000");
+        EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX))
+            .andReturn(connConfig);
+        PowerMock.replayAll();
+        Worker.consumerConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
+                               null, noneConnectorClientConfigOverridePolicy);
+    }
+
+    @Test
+    public void testAdminConfigsClientOverridesWithAllPolicy() {
+        Map<String, String> props = new HashMap<>(workerProps);
+        props.put("admin.client.id", "testid");
+        props.put("admin.metadata.max.age.ms", "5000");
+        WorkerConfig configWithOverrides = new StandaloneConfig(props);
+
+        Map<String, Object> connConfig = new HashMap<String, Object>();
+        connConfig.put("metadata.max.age.ms", "10000");
+
+        Map<String, String> expectedConfigs = new HashMap<>();
+        expectedConfigs.put("bootstrap.servers", "localhost:9092");
+        expectedConfigs.put("client.id", "testid");
+        expectedConfigs.put("metadata.max.age.ms", "10000");
+
+        EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX))
+            .andReturn(connConfig);
+        PowerMock.replayAll();
+        assertEquals(expectedConfigs, Worker.adminConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
+                                                             null, allConnectorClientConfigOverridePolicy));
+
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testAdminConfigsClientOverridesWithNonePolicy() {
+        Map<String, String> props = new HashMap<>(workerProps);
+        props.put("admin.client.id", "testid");
+        props.put("admin.metadata.max.age.ms", "5000");
+        WorkerConfig configWithOverrides = new StandaloneConfig(props);
+
+        Map<String, Object> connConfig = new HashMap<String, Object>();
+        connConfig.put("metadata.max.age.ms", "10000");
+
+        EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX))
+            .andReturn(connConfig);
+        PowerMock.replayAll();
+        Worker.adminConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
+                                                          null, noneConnectorClientConfigOverridePolicy);
+
+    }
+
+
     private void assertStatistics(Worker worker, int connectors, int tasks) {
         MetricGroup workerMetrics = worker.workerMetricsGroup().metricGroup();
         assertEquals(connectors, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "connector-count"), 0.0001d);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index b381263..b03ddf3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -23,6 +23,8 @@ import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.errors.AlreadyExistsException;
 import org.apache.kafka.connect.errors.NotFoundException;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
@@ -177,6 +179,8 @@ public class DistributedHerderTest {
     private SinkConnectorConfig conn1SinkConfig;
     private SinkConnectorConfig conn1SinkConfigUpdated;
     private short connectProtocolVersion;
+    private final ConnectorClientConfigOverridePolicy
+        noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy();
 
     @Before
     public void setUp() throws Exception {
@@ -191,7 +195,7 @@ public class DistributedHerderTest {
         herder = PowerMock.createPartialMock(DistributedHerder.class,
                 new String[]{"backoff", "connectorTypeForClass", "updateDeletedConnectorStatus"},
                 new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, KAFKA_CLUSTER_ID,
-                statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time);
+                statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time, noneConnectorClientConfigOverridePolicy);
 
         configUpdateListener = herder.new ConfigUpdateListener();
         rebalanceListener = herder.new RebalanceListener(time);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 8aa1c70..e020cee 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.errors.AlreadyExistsException;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.NotFoundException;
@@ -111,11 +113,15 @@ public class StandaloneHerderTest {
     @Mock protected Callback<Herder.Created<ConnectorInfo>> createCallback;
     @Mock protected StatusBackingStore statusBackingStore;
 
+    private final ConnectorClientConfigOverridePolicy
+        noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy();
+
+
     @Before
     public void setup() {
         worker = PowerMock.createMock(Worker.class);
         herder = PowerMock.createPartialMock(StandaloneHerder.class, new String[]{"connectorTypeForClass"},
-            worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new MemoryConfigBackingStore(transformer));
+            worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new MemoryConfigBackingStore(transformer), noneConnectorClientConfigOverridePolicy);
         plugins = PowerMock.createMock(Plugins.class);
         pluginLoader = PowerMock.createMock(PluginClassLoader.class);
         delegatingLoader = PowerMock.createMock(DelegatingClassLoader.class);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index e610812..07c2755 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -362,13 +362,14 @@ public class EmbeddedConnectCluster {
         try (OutputStreamWriter out = new OutputStreamWriter(httpCon.getOutputStream())) {
             out.write(body);
         }
-        try (InputStream is = httpCon.getInputStream()) {
-            int c;
-            StringBuilder response = new StringBuilder();
-            while ((c = is.read()) != -1) {
-                response.append((char) c);
+        if (httpCon.getResponseCode() < HttpURLConnection.HTTP_BAD_REQUEST) {
+            try (InputStream is = httpCon.getInputStream()) {
+                log.info("PUT response for URL={} is {}", url, responseToString(is));
+            }
+        } else {
+            try (InputStream is = httpCon.getErrorStream()) {
+                log.info("PUT error response for URL={} is {}", url, responseToString(is));
             }
-            log.info("Put response for URL={} is {}", url, response);
         }
         return httpCon.getResponseCode();
     }
@@ -392,7 +393,7 @@ public class EmbeddedConnectCluster {
             while ((c = is.read()) != -1) {
                 response.append((char) c);
             }
-            log.debug("Get response for URL={} is {}", url, response);
+            log.debug("GET response for URL={} is {}", url, response);
             return response.toString();
         } catch (IOException e) {
             Response.Status status = Response.Status.fromStatusCode(httpCon.getResponseCode());
@@ -413,6 +414,15 @@ public class EmbeddedConnectCluster {
         return httpCon.getResponseCode();
     }
 
+    private String responseToString(InputStream stream) throws IOException {
+        int c;
+        StringBuilder response = new StringBuilder();
+        while ((c = stream.read()) != -1) {
+            response.append((char) c);
+        }
+        return response.toString();
+    }
+
     public static class Builder {
         private String name = UUID.randomUUID().toString();
         private Map<String, String> workerProps = new HashMap<>();