You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/05/17 08:37:55 UTC
[kafka] branch trunk updated: KAFKA-8265: Initial implementation
for ConnectorClientConfigPolicy to enable overrides (KIP-458) (#6624)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2e91a31 KAFKA-8265: Initial implementation for ConnectorClientConfigPolicy to enable overrides (KIP-458) (#6624)
2e91a31 is described below
commit 2e91a310d7bf9e7d4d46b0bc0ca0c11cb4531e10
Author: Magesh Nandakumar <ma...@gmail.com>
AuthorDate: Fri May 17 01:37:32 2019 -0700
KAFKA-8265: Initial implementation for ConnectorClientConfigPolicy to enable overrides (KIP-458) (#6624)
Implementation to enable policy for Connector Client config overrides. This is
implemented per the KIP-458.
Reviewers: Randall Hauch <rh...@gmail.com>
---
checkstyle/import-control.xml | 8 +
checkstyle/suppressions.xml | 3 +-
.../kafka/clients/admin/AdminClientConfig.java | 4 +
.../kafka/clients/consumer/ConsumerConfig.java | 4 +
.../kafka/clients/producer/ProducerConfig.java | 4 +
.../ConnectorClientConfigOverridePolicy.java | 47 ++++++
.../policy/ConnectorClientConfigRequest.java | 101 +++++++++++++
.../kafka/connect/cli/ConnectDistributed.java | 10 +-
.../kafka/connect/cli/ConnectStandalone.java | 10 +-
...bstractConnectorClientConfigOverridePolicy.java | 57 +++++++
.../AllConnectorClientConfigOverridePolicy.java | 46 ++++++
.../NoneConnectorClientConfigOverridePolicy.java | 47 ++++++
...incipalConnectorClientConfigOverridePolicy.java | 57 +++++++
.../kafka/connect/runtime/AbstractHerder.java | 113 +++++++++++++-
.../kafka/connect/runtime/ConnectorConfig.java | 5 +
.../org/apache/kafka/connect/runtime/Worker.java | 117 ++++++++++++---
.../apache/kafka/connect/runtime/WorkerConfig.java | 12 +-
.../runtime/distributed/DistributedHerder.java | 12 +-
.../runtime/errors/DeadLetterQueueReporter.java | 5 +-
.../runtime/isolation/DelegatingClassLoader.java | 13 +-
.../runtime/isolation/PluginScanResult.java | 28 +++-
.../connect/runtime/isolation/PluginType.java | 2 +
.../connect/runtime/isolation/PluginUtils.java | 1 +
.../kafka/connect/runtime/isolation/Plugins.java | 7 +-
.../runtime/standalone/StandaloneHerder.java | 12 +-
...ctor.policy.ConnectorClientConfigOverridePolicy | 18 +++
...aseConnectorClientConfigOverridePolicyTest.java | 59 ++++++++
...oneConnectorClientConfigOverridePolicyTest.java | 49 ++++++
...palConnectorClientConfigOverridePolicyTest.java | 50 ++++++
.../ConnectorCientPolicyIntegrationTest.java | 146 ++++++++++++++++++
.../kafka/connect/runtime/AbstractHerderTest.java | 82 ++++++++--
.../apache/kafka/connect/runtime/WorkerTest.java | 167 ++++++++++++++++++---
.../runtime/distributed/DistributedHerderTest.java | 6 +-
.../runtime/standalone/StandaloneHerderTest.java | 8 +-
.../util/clusters/EmbeddedConnectCluster.java | 24 ++-
35 files changed, 1246 insertions(+), 88 deletions(-)
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 67cbe5a..a76fd1d 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -322,6 +322,13 @@
<allow pkg="org.apache.kafka.connect.storage" />
</subpackage>
+ <subpackage name="connector.policy">
+ <allow pkg="org.apache.kafka.connect.health" />
+ <allow pkg="org.apache.kafka.connect.connector" />
+ <!-- for testing -->
+ <allow pkg="org.apache.kafka.connect.runtime" />
+ </subpackage>
+
<subpackage name="rest">
<allow pkg="org.apache.kafka.connect.health" />
<allow pkg="javax.ws.rs" />
@@ -356,6 +363,7 @@
<allow pkg="org.apache.kafka.connect.storage" />
<allow pkg="org.apache.kafka.connect.util" />
<allow pkg="org.apache.kafka.common" />
+ <allow pkg="org.apache.kafka.connect.connector.policy" />
</subpackage>
<subpackage name="storage">
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 0c65edc..977a7ac 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -81,7 +81,8 @@
<!-- Connect -->
<suppress checks="ClassFanOutComplexity"
files="DistributedHerder(|Test).java"/>
-
+ <suppress checks="ClassFanOutComplexity"
+ files="Worker.java"/>
<suppress checks="MethodLength"
files="(KafkaConfigBackingStore|RequestResponseTest|WorkerSinkTaskTest).java"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index 47c76ac..1fa2335 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -200,6 +200,10 @@ public class AdminClientConfig extends AbstractConfig {
return CONFIG.names();
}
+ public static ConfigDef configDef() {
+ return CONFIG;
+ }
+
public static void main(String[] args) {
System.out.println(CONFIG.toHtmlTable());
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index c9b5004..ba1928e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -531,6 +531,10 @@ public class ConsumerConfig extends AbstractConfig {
return CONFIG.names();
}
+ public static ConfigDef configDef() {
+ return CONFIG;
+ }
+
public static void main(String[] args) {
System.out.println(CONFIG.toHtmlTable());
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 758f858..396d91e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -404,6 +404,10 @@ public class ProducerConfig extends AbstractConfig {
return CONFIG.names();
}
+ public static ConfigDef configDef() {
+ return CONFIG;
+ }
+
public static void main(String[] args) {
System.out.println(CONFIG.toHtmlTable());
}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/policy/ConnectorClientConfigOverridePolicy.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/policy/ConnectorClientConfigOverridePolicy.java
new file mode 100644
index 0000000..94e5fd6
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/policy/ConnectorClientConfigOverridePolicy.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.connector.policy;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.config.ConfigValue;
+
+import java.util.List;
+
+/**
+ * <p>An interface for enforcing a policy on overriding of client configs via the connector configs.
+ *
+ * <p>Common use cases are ability to provide principal per connector, <code>sasl.jaas.config</code>
+ * and/or enforcing that the producer/consumer configurations for optimizations are within acceptable ranges.
+ */
+public interface ConnectorClientConfigOverridePolicy extends Configurable, AutoCloseable {
+
+
+ /**
+ * Worker will invoke this while constructing the producer for the SourceConnectors, DLQ for SinkConnectors and the consumer for the
+ * SinkConnectors to validate if all of the overridden client configurations are allowed per the
+ * policy implementation. This would also be invoked during the validate of connector configs via the Rest API.
+ *
+ * If there are any policy violations, the connector will not be started.
+ *
+ * @param connectorClientConfigRequest an instance of {@code ConnectorClientConfigRequest} that provides the configs to overridden and
+ * its context; never {@code null}
+ * @return list of {@link ConfigValue} instances that describe each client configuration in the request and includes an
+ {@link ConfigValue#errorMessages error} if the configuration is not allowed by the policy; never null
+ */
+ List<ConfigValue> validate(ConnectorClientConfigRequest connectorClientConfigRequest);
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/policy/ConnectorClientConfigRequest.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/policy/ConnectorClientConfigRequest.java
new file mode 100644
index 0000000..11f756b
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/policy/ConnectorClientConfigRequest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.connector.policy;
+
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.health.ConnectorType;
+
+import java.util.Map;
+
+public class ConnectorClientConfigRequest {
+
+ private Map<String, Object> clientProps;
+ private ClientType clientType;
+ private String connectorName;
+ private ConnectorType connectorType;
+ private Class<? extends Connector> connectorClass;
+
+ public ConnectorClientConfigRequest(
+ String connectorName,
+ ConnectorType connectorType,
+ Class<? extends Connector> connectorClass,
+ Map<String, Object> clientProps,
+ ClientType clientType) {
+ this.clientProps = clientProps;
+ this.clientType = clientType;
+ this.connectorName = connectorName;
+ this.connectorType = connectorType;
+ this.connectorClass = connectorClass;
+ }
+
+ /**
+ * Provides Config with prefix {@code producer.override.} for {@link ConnectorType#SOURCE}.
+ * Provides Config with prefix {@code consumer.override.} for {@link ConnectorType#SINK}.
+ * Provides Config with prefix {@code producer.override.} for {@link ConnectorType#SINK} for DLQ.
+ * Provides Config with prefix {@code admin.override.} for {@link ConnectorType#SINK} for DLQ.
+ *
+ * @return The client properties specified in the Connector Config with prefix {@code producer.override.} ,
+ * {@code consumer.override.} and {@code admin.override.}. The configs don't include the prefixes.
+ */
+ public Map<String, Object> clientProps() {
+ return clientProps;
+ }
+
+ /**
+ * {@link ClientType#PRODUCER} for {@link ConnectorType#SOURCE}
+ * {@link ClientType#CONSUMER} for {@link ConnectorType#SINK}
+ * {@link ClientType#PRODUCER} for DLQ in {@link ConnectorType#SINK}
+ * {@link ClientType#ADMIN} for DLQ Topic Creation in {@link ConnectorType#SINK}
+ *
+ * @return enumeration specifying the client type that is being overriden by the worker; never null.
+ */
+ public ClientType clientType() {
+ return clientType;
+ }
+
+ /**
+ * Name of the connector specified in the connector config.
+ *
+ * @return name of the connector; never null.
+ */
+ public String connectorName() {
+ return connectorName;
+ }
+
+ /**
+ * Type of the Connector.
+ *
+ * @return enumeration specifying the type of the connector {@link ConnectorType#SINK} or {@link ConnectorType#SOURCE}.
+ */
+ public ConnectorType connectorType() {
+ return connectorType;
+ }
+
+ /**
+ * The class of the Connector.
+ *
+ * @return the class of the Connector being created; never null
+ */
+ public Class<? extends Connector> connectorClass() {
+ return connectorClass;
+ }
+
+ public enum ClientType {
+ PRODUCER, CONSUMER, ADMIN;
+ }
+}
\ No newline at end of file
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 17d65ac..22c1ad8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -19,8 +19,10 @@ package org.apache.kafka.connect.cli;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.Connect;
import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.WorkerInfo;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
@@ -102,7 +104,11 @@ public class ConnectDistributed {
KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
offsetBackingStore.configure(config);
- Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore);
+ ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
+ config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
+ config, ConnectorClientConfigOverridePolicy.class);
+
+ Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy);
WorkerConfigTransformer configTransformer = worker.configTransformer();
Converter internalValueConverter = worker.getInternalValueConverter();
@@ -116,7 +122,7 @@ public class ConnectDistributed {
DistributedHerder herder = new DistributedHerder(config, time, worker,
kafkaClusterId, statusBackingStore, configBackingStore,
- advertisedUrl.toString());
+ advertisedUrl.toString(), connectorClientConfigOverridePolicy);
final Connect connect = new Connect(herder, rest);
log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index 499e6df..cf7b93b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -19,10 +19,12 @@ package org.apache.kafka.connect.cli;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.Connect;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerInfo;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.RestServer;
@@ -87,9 +89,13 @@ public class ConnectStandalone {
URI advertisedUrl = rest.advertisedUrl();
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
- Worker worker = new Worker(workerId, time, plugins, config, new FileOffsetBackingStore());
+ ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
+ config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
+ config, ConnectorClientConfigOverridePolicy.class);
+ Worker worker = new Worker(workerId, time, plugins, config, new FileOffsetBackingStore(),
+ connectorClientConfigOverridePolicy);
- Herder herder = new StandaloneHerder(worker, kafkaClusterId);
+ Herder herder = new StandaloneHerder(worker, kafkaClusterId, connectorClientConfigOverridePolicy);
final Connect connect = new Connect(herder, rest);
log.info("Kafka Connect standalone worker initialization took {}ms", time.hiResClockMs() - initStart);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AbstractConnectorClientConfigOverridePolicy.java b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AbstractConnectorClientConfigOverridePolicy.java
new file mode 100644
index 0000000..3c310db
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AbstractConnectorClientConfigOverridePolicy.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.connector.policy;
+
+import org.apache.kafka.common.config.ConfigValue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public abstract class AbstractConnectorClientConfigOverridePolicy implements ConnectorClientConfigOverridePolicy {
+
+ @Override
+ public void close() throws Exception {
+
+ }
+
+ @Override
+ public final List<ConfigValue> validate(ConnectorClientConfigRequest connectorClientConfigRequest) {
+ Map<String, Object> inputConfig = connectorClientConfigRequest.clientProps();
+ return inputConfig.entrySet().stream().map(this::configValue).collect(Collectors.toList());
+ }
+
+ protected ConfigValue configValue(Map.Entry<String, Object> configEntry) {
+ ConfigValue configValue =
+ new ConfigValue(configEntry.getKey(), configEntry.getValue(), new ArrayList<>(), new ArrayList<String>());
+ validate(configValue);
+ return configValue;
+ }
+
+ protected void validate(ConfigValue configValue) {
+ if (!isAllowed(configValue)) {
+ configValue.addErrorMessage("The '" + policyName() + "' policy does not allow '" + configValue.name()
+ + "' to be overridden in the connector configuration.");
+ }
+ }
+
+ protected abstract String policyName();
+
+ protected abstract boolean isAllowed(ConfigValue configValue);
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AllConnectorClientConfigOverridePolicy.java b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AllConnectorClientConfigOverridePolicy.java
new file mode 100644
index 0000000..ecd76ba
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AllConnectorClientConfigOverridePolicy.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.connector.policy;
+
+import org.apache.kafka.common.config.ConfigValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Allows all client configurations to be overridden via the connector configs by setting {@code client.config.policy} to {@code All}
+ */
+public class AllConnectorClientConfigOverridePolicy extends AbstractConnectorClientConfigOverridePolicy {
+ private static final Logger log = LoggerFactory.getLogger(AllConnectorClientConfigOverridePolicy.class);
+
+ @Override
+ protected String policyName() {
+ return "All";
+ }
+
+ @Override
+ protected boolean isAllowed(ConfigValue configValue) {
+ return true;
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs) {
+ log.info("Setting up All Policy for ConnectorClientConfigOverride. This will allow all client configurations to be overridden");
+ }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/NoneConnectorClientConfigOverridePolicy.java b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/NoneConnectorClientConfigOverridePolicy.java
new file mode 100644
index 0000000..8236c89
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/NoneConnectorClientConfigOverridePolicy.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.connector.policy;
+
+import org.apache.kafka.common.config.ConfigValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Disallow any client configuration to be overridden via the connector configs by setting {@code client.config.policy} to {@code None}.
+ * This is the default behavior.
+ */
+public class NoneConnectorClientConfigOverridePolicy extends AbstractConnectorClientConfigOverridePolicy {
+ private static final Logger log = LoggerFactory.getLogger(NoneConnectorClientConfigOverridePolicy.class);
+
+ @Override
+ protected String policyName() {
+ return "None";
+ }
+
+ @Override
+ protected boolean isAllowed(ConfigValue configValue) {
+ return false;
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs) {
+ log.info("Setting up None Policy for ConnectorClientConfigOverride. This will disallow any client configuration to be overridden");
+ }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicy.java b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicy.java
new file mode 100644
index 0000000..7d80d80
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicy.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.connector.policy;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.ConfigValue;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Allows all {@code sasl} configurations to be overridden via the connector configs by setting {@code client.config.policy} to
+ * {@code Principal}. This allows to set a principal per connector.
+ */
+public class PrincipalConnectorClientConfigOverridePolicy extends AbstractConnectorClientConfigOverridePolicy {
+ private static final Logger log = LoggerFactory.getLogger(PrincipalConnectorClientConfigOverridePolicy.class);
+
+ private static final Set<String> ALLOWED_CONFIG =
+ Stream.of(SaslConfigs.SASL_JAAS_CONFIG, SaslConfigs.SASL_MECHANISM, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG).
+ collect(Collectors.toSet());
+
+ @Override
+ protected String policyName() {
+ return "Principal";
+ }
+
+ @Override
+ protected boolean isAllowed(ConfigValue configValue) {
+ return ALLOWED_CONFIG.contains(configValue.name());
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs) {
+ log.info("Setting up Principal policy for ConnectorClientConfigOverride. This will allow `sasl` client configuration to be "
+ + "overridden.");
+ }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 8e7d016..e92f55e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.connect.runtime;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.ConfigKey;
@@ -23,6 +25,8 @@ import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigTransformer;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.isolation.Plugins;
@@ -87,6 +91,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
private final String kafkaClusterId;
protected final StatusBackingStore statusBackingStore;
protected final ConfigBackingStore configBackingStore;
+ private final ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;
private Map<String, Connector> tempConnectors = new ConcurrentHashMap<>();
@@ -94,13 +99,15 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
String workerId,
String kafkaClusterId,
StatusBackingStore statusBackingStore,
- ConfigBackingStore configBackingStore) {
+ ConfigBackingStore configBackingStore,
+ ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
this.worker = worker;
this.worker.herder = this;
this.workerId = workerId;
this.kafkaClusterId = kafkaClusterId;
this.statusBackingStore = statusBackingStore;
this.configBackingStore = configBackingStore;
+ this.connectorClientConfigOverridePolicy = connectorClientConfigOverridePolicy;
}
@Override
@@ -280,14 +287,17 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
throw new BadRequestException("Connector config " + connectorProps + " contains no connector type");
Connector connector = getConnector(connType);
+ org.apache.kafka.connect.health.ConnectorType connectorType;
ClassLoader savedLoader = plugins().compareAndSwapLoaders(connector);
try {
ConfigDef baseConfigDef;
if (connector instanceof SourceConnector) {
baseConfigDef = SourceConnectorConfig.configDef();
+ connectorType = org.apache.kafka.connect.health.ConnectorType.SOURCE;
} else {
baseConfigDef = SinkConnectorConfig.configDef();
SinkConnectorConfig.validate(connectorProps);
+ connectorType = org.apache.kafka.connect.health.ConnectorType.SINK;
}
ConfigDef enrichedConfigDef = ConnectorConfig.enrich(plugins(), baseConfigDef, connectorProps, false);
Map<String, ConfigValue> validatedConnectorConfig = validateBasicConnectorConfig(
@@ -321,12 +331,105 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
configKeys.putAll(configDef.configKeys());
allGroups.addAll(configDef.groups());
configValues.addAll(config.configValues());
- return generateResult(connType, configKeys, configValues, new ArrayList<>(allGroups));
+ ConfigInfos configInfos = generateResult(connType, configKeys, configValues, new ArrayList<>(allGroups));
+
+ AbstractConfig connectorConfig = new AbstractConfig(new ConfigDef(), connectorProps);
+ String connName = connectorProps.get(ConnectorConfig.NAME_CONFIG);
+ ConfigInfos producerConfigInfos = null;
+ ConfigInfos consumerConfigInfos = null;
+ ConfigInfos adminConfigInfos = null;
+ if (connectorType.equals(org.apache.kafka.connect.health.ConnectorType.SOURCE)) {
+ producerConfigInfos = validateClientOverrides(connName,
+ ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX,
+ connectorConfig,
+ ProducerConfig.configDef(),
+ connector.getClass(),
+ connectorType,
+ ConnectorClientConfigRequest.ClientType.PRODUCER,
+ connectorClientConfigOverridePolicy);
+ return mergeConfigInfos(connType, configInfos, producerConfigInfos);
+ } else {
+ consumerConfigInfos = validateClientOverrides(connName,
+ ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX,
+ connectorConfig,
+ ProducerConfig.configDef(),
+ connector.getClass(),
+ connectorType,
+ ConnectorClientConfigRequest.ClientType.CONSUMER,
+ connectorClientConfigOverridePolicy);
+ // check if topic for dead letter queue exists
+ String topic = connectorProps.get(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG);
+ if (topic != null && !topic.isEmpty()) {
+ adminConfigInfos = validateClientOverrides(connName,
+ ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX,
+ connectorConfig,
+ ProducerConfig.configDef(),
+ connector.getClass(),
+ connectorType,
+ ConnectorClientConfigRequest.ClientType.ADMIN,
+ connectorClientConfigOverridePolicy);
+ }
+
+ }
+ return mergeConfigInfos(connType, configInfos, producerConfigInfos, consumerConfigInfos, adminConfigInfos);
} finally {
Plugins.compareAndSwapLoaders(savedLoader);
}
}
+ private static ConfigInfos mergeConfigInfos(String connType, ConfigInfos... configInfosList) {
+ int errorCount = 0;
+ List<ConfigInfo> configInfoList = new LinkedList<>();
+ Set<String> groups = new LinkedHashSet<>();
+ for (ConfigInfos configInfos : configInfosList) {
+ if (configInfos != null) {
+ errorCount += configInfos.errorCount();
+ configInfoList.addAll(configInfos.values());
+ groups.addAll(configInfos.groups());
+ }
+ }
+ return new ConfigInfos(connType, errorCount, new ArrayList<>(groups), configInfoList);
+ }
+
+ private static ConfigInfos validateClientOverrides(String connName,
+ String prefix,
+ AbstractConfig connectorConfig,
+ ConfigDef configDef,
+ Class<? extends Connector> connectorClass,
+ org.apache.kafka.connect.health.ConnectorType connectorType,
+ ConnectorClientConfigRequest.ClientType clientType,
+ ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+ int errorCount = 0;
+ List<ConfigInfo> configInfoList = new LinkedList<>();
+ Map<String, ConfigKey> configKeys = configDef.configKeys();
+ Set<String> groups = new LinkedHashSet<>();
+ Map<String, Object> clientConfigs = connectorConfig.originalsWithPrefix(prefix);
+ ConnectorClientConfigRequest connectorClientConfigRequest = new ConnectorClientConfigRequest(
+ connName, connectorType, connectorClass, clientConfigs, clientType);
+ List<ConfigValue> configValues = connectorClientConfigOverridePolicy.validate(connectorClientConfigRequest);
+ if (configValues != null) {
+ for (ConfigValue validatedConfigValue : configValues) {
+ ConfigKey configKey = configKeys.get(validatedConfigValue.name());
+ ConfigKeyInfo configKeyInfo = null;
+ if (configKey != null) {
+ if (configKey.group != null) {
+ groups.add(configKey.group);
+ }
+ configKeyInfo = convertConfigKey(configKey, prefix);
+ }
+
+ ConfigValue configValue = new ConfigValue(prefix + validatedConfigValue.name(), validatedConfigValue.value(),
+ validatedConfigValue.recommendedValues(), validatedConfigValue.errorMessages());
+ if (configValue.errorMessages().size() > 0) {
+ errorCount++;
+ }
+ ConfigValueInfo configValueInfo = convertConfigValue(configValue, configKey != null ? configKey.type : null);
+ configInfoList.add(new ConfigInfo(configKeyInfo, configValueInfo));
+ }
+ }
+ return new ConfigInfos(connectorClass.toString(), errorCount, new ArrayList<>(groups), configInfoList);
+ }
+
// public for testing
public static ConfigInfos generateResult(String connType, Map<String, ConfigKey> configKeys, List<ConfigValue> configValues, List<String> groups) {
int errorCount = 0;
@@ -358,7 +461,11 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
}
private static ConfigKeyInfo convertConfigKey(ConfigKey configKey) {
- String name = configKey.name;
+ return convertConfigKey(configKey, "");
+ }
+
+ private static ConfigKeyInfo convertConfigKey(ConfigKey configKey, String prefix) {
+ String name = prefix + configKey.name;
Type type = configKey.type;
String typeName = configKey.type.name();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 8889aad..1cebc94 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -140,6 +140,11 @@ public class ConnectorConfig extends AbstractConfig {
"a failure. This is 'false' by default, which will prevent record keys, values, and headers from being written to log files, " +
"although some information such as topic and partition number will still be logged.";
+
+ public static final String CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX = "producer.overrides.";
+ public static final String CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX = "consumer.overrides.";
+ public static final String CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX = "admin.overrides.";
+
private final EnrichedConnectorConfig enrichedConfig;
private static class EnrichedConnectorConfig extends AbstractConfig {
EnrichedConnectorConfig(ConfigDef configDef, Map<String, String> props) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index f6f7452..f848a18 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Frequencies;
@@ -30,7 +31,10 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest;
import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.health.ConnectorType;
import org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
@@ -68,6 +72,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
/**
@@ -98,15 +103,16 @@ public class Worker {
private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>();
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
private WorkerConfigTransformer workerConfigTransformer;
+ private ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;
public Worker(
- String workerId,
- Time time,
- Plugins plugins,
- WorkerConfig config,
- OffsetBackingStore offsetBackingStore
- ) {
- this(workerId, time, plugins, config, offsetBackingStore, Executors.newCachedThreadPool());
+ String workerId,
+ Time time,
+ Plugins plugins,
+ WorkerConfig config,
+ OffsetBackingStore offsetBackingStore,
+ ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+ this(workerId, time, plugins, config, offsetBackingStore, Executors.newCachedThreadPool(), connectorClientConfigOverridePolicy);
}
@SuppressWarnings("deprecation")
@@ -116,7 +122,8 @@ public class Worker {
Plugins plugins,
WorkerConfig config,
OffsetBackingStore offsetBackingStore,
- ExecutorService executorService
+ ExecutorService executorService,
+ ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy
) {
this.metrics = new ConnectMetrics(workerId, config, time);
this.executor = executorService;
@@ -124,6 +131,7 @@ public class Worker {
this.time = time;
this.plugins = plugins;
this.config = config;
+ this.connectorClientConfigOverridePolicy = connectorClientConfigOverridePolicy;
this.workerMetricsGroup = new WorkerMetricsGroup(metrics);
// Internal converters are required properties, thus getClass won't return null.
@@ -486,7 +494,8 @@ public class Worker {
HeaderConverter headerConverter,
ClassLoader loader) {
ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id);
-
+ final Class<? extends Connector> connectorClass = plugins.connectorClass(
+ connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connConfig.errorRetryTimeout(),
connConfig.errorMaxDelayInMillis(), connConfig.errorToleranceType(), Time.SYSTEM);
retryWithToleranceOperator.metrics(errorHandlingMetrics);
@@ -500,7 +509,8 @@ public class Worker {
internalKeyConverter, internalValueConverter);
OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
internalKeyConverter, internalValueConverter);
- Map<String, Object> producerProps = producerConfigs("connector-producer-" + id, config);
+ Map<String, Object> producerProps = producerConfigs(id, "connector-producer-" + id, config, connConfig, connectorClass,
+ connectorClientConfigOverridePolicy);
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
// Note we pass the configState as it performs dynamic transformations under the covers
@@ -511,9 +521,9 @@ public class Worker {
TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations(), retryWithToleranceOperator);
log.info("Initializing: {}", transformationChain);
SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings());
- retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics));
+ retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
- Map<String, Object> consumerProps = consumerConfigs(id, config);
+ Map<String, Object> consumerProps = consumerConfigs(id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy);
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps);
return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter,
@@ -525,7 +535,12 @@ public class Worker {
}
}
- static Map<String, Object> producerConfigs(String defaultClientId, WorkerConfig config) {
+ static Map<String, Object> producerConfigs(ConnectorTaskId id,
+ String defaultClientId,
+ WorkerConfig config,
+ ConnectorConfig connConfig,
+ Class<? extends Connector> connectorClass,
+ ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
@@ -540,11 +555,22 @@ public class Worker {
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, defaultClientId);
// User-specified overrides
producerProps.putAll(config.originalsWithPrefix("producer."));
+
+ // Connector-specified overrides
+ Map<String, Object> producerOverrides =
+ connectorClientConfigOverrides(id, connConfig, connectorClass, ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX,
+ ConnectorType.SOURCE, ConnectorClientConfigRequest.ClientType.PRODUCER,
+ connectorClientConfigOverridePolicy);
+ producerProps.putAll(producerOverrides);
+
return producerProps;
}
-
- static Map<String, Object> consumerConfigs(ConnectorTaskId id, WorkerConfig config) {
+ static Map<String, Object> consumerConfigs(ConnectorTaskId id,
+ WorkerConfig config,
+ ConnectorConfig connConfig,
+ Class<? extends Connector> connectorClass,
+ ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
// Include any unknown worker configs so consumer configs can be set globally on the worker
// and through to the task
Map<String, Object> consumerProps = new HashMap<>();
@@ -559,15 +585,68 @@ public class Worker {
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerProps.putAll(config.originalsWithPrefix("consumer."));
+ // Connector-specified overrides
+ Map<String, Object> consumerOverrides =
+ connectorClientConfigOverrides(id, connConfig, connectorClass, ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX,
+ ConnectorType.SINK, ConnectorClientConfigRequest.ClientType.CONSUMER,
+ connectorClientConfigOverridePolicy);
+ consumerProps.putAll(consumerOverrides);
+
return consumerProps;
}
+ static Map<String, Object> adminConfigs(ConnectorTaskId id,
+ WorkerConfig config,
+ ConnectorConfig connConfig,
+ Class<? extends Connector> connectorClass,
+ ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+ Map<String, Object> adminProps = new HashMap<>();
+ adminProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
+ // User-specified overrides
+ adminProps.putAll(config.originalsWithPrefix("admin."));
+
+ // Connector-specified overrides
+ Map<String, Object> adminOverrides =
+ connectorClientConfigOverrides(id, connConfig, connectorClass, ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX,
+ ConnectorType.SINK, ConnectorClientConfigRequest.ClientType.ADMIN,
+ connectorClientConfigOverridePolicy);
+ adminProps.putAll(adminOverrides);
+
+ return adminProps;
+ }
+
+ private static Map<String, Object> connectorClientConfigOverrides(ConnectorTaskId id,
+ ConnectorConfig connConfig,
+ Class<? extends Connector> connectorClass,
+ String clientConfigPrefix,
+ ConnectorType connectorType,
+ ConnectorClientConfigRequest.ClientType clientType,
+ ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+ Map<String, Object> clientOverrides = connConfig.originalsWithPrefix(clientConfigPrefix);
+ ConnectorClientConfigRequest connectorClientConfigRequest = new ConnectorClientConfigRequest(
+ id.connector(),
+ connectorType,
+ connectorClass,
+ clientOverrides,
+ clientType
+ );
+ List<ConfigValue> configValues = connectorClientConfigOverridePolicy.validate(connectorClientConfigRequest);
+ List<ConfigValue> errorConfigs = configValues.stream().
+ filter(configValue -> configValue.errorMessages().size() > 0).collect(Collectors.toList());
+ // These should be caught when the herder validates the connector configuration, but just in case
+ if (errorConfigs.size() > 0) {
+ throw new ConnectException("Client Config Overrides not allowed " + errorConfigs);
+ }
+ return clientOverrides;
+ }
+
ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) {
return new ErrorHandlingMetrics(id, metrics);
}
private List<ErrorReporter> sinkTaskReporters(ConnectorTaskId id, SinkConnectorConfig connConfig,
- ErrorHandlingMetrics errorHandlingMetrics) {
+ ErrorHandlingMetrics errorHandlingMetrics,
+ Class<? extends Connector> connectorClass) {
ArrayList<ErrorReporter> reporters = new ArrayList<>();
LogReporter logReporter = new LogReporter(id, connConfig, errorHandlingMetrics);
reporters.add(logReporter);
@@ -575,8 +654,10 @@ public class Worker {
// check if topic for dead letter queue exists
String topic = connConfig.dlqTopicName();
if (topic != null && !topic.isEmpty()) {
- Map<String, Object> producerProps = producerConfigs("connector-dlq-producer-" + id, config);
- DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(config, id, connConfig, producerProps, errorHandlingMetrics);
+ Map<String, Object> producerProps = producerConfigs(id, "connector-dlq-producer-" + id, config, connConfig, connectorClass,
+ connectorClientConfigOverridePolicy);
+ Map<String, Object> adminProps = adminConfigs(id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy);
+ DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(adminProps, id, connConfig, producerProps, errorHandlingMetrics);
reporters.add(reporter);
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index efa92b5..773358f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -212,6 +212,14 @@ public class WorkerConfig extends AbstractConfig {
+ "<code>ConnectRestExtension</code> allows you to inject into Connect's REST API user defined resources like filters. "
+ "Typically used to add custom capability like logging, security, etc. ";
+ public static final String CONNECTOR_CLIENT_POLICY_CLASS_CONFIG = "client.config.policy";
+ public static final String CONNECTOR_CLIENT_POLICY_CLASS_DOC =
+ "Class name or alias of implementation of <code>ConnectorClientConfigOverridePolicy</code>. Defines what client configurations can be "
+ + "overriden by the connector. The default implementation is `None`. The other possible policies in the framework include `All` "
+ + "and `Principal`. ";
+ public static final String CONNECTOR_CLIENT_POLICY_CLASS_DEFAULT = "None";
+
+
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;
@@ -289,7 +297,9 @@ public class WorkerConfig extends AbstractConfig {
Collections.emptyList(),
Importance.LOW, CONFIG_PROVIDERS_DOC)
.define(REST_EXTENSION_CLASSES_CONFIG, Type.LIST, "",
- Importance.LOW, REST_EXTENSION_CLASSES_DOC);
+ Importance.LOW, REST_EXTENSION_CLASSES_DOC)
+ .define(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, Type.STRING, CONNECTOR_CLIENT_POLICY_CLASS_DEFAULT,
+ Importance.MEDIUM, CONNECTOR_CLIENT_POLICY_CLASS_DOC);
}
private void logInternalConverterDeprecationWarnings(Map<String, String> props) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 94ca734..585836e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
@@ -165,8 +166,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
String kafkaClusterId,
StatusBackingStore statusBackingStore,
ConfigBackingStore configBackingStore,
- String restUrl) {
- this(config, worker, worker.workerId(), kafkaClusterId, statusBackingStore, configBackingStore, null, restUrl, worker.metrics(), time);
+ String restUrl,
+ ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+ this(config, worker, worker.workerId(), kafkaClusterId, statusBackingStore, configBackingStore, null, restUrl, worker.metrics(),
+ time, connectorClientConfigOverridePolicy);
configBackingStore.setUpdateListener(new ConfigUpdateListener());
}
@@ -180,8 +183,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
WorkerGroupMember member,
String restUrl,
ConnectMetrics metrics,
- Time time) {
- super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore);
+ Time time,
+ ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+ super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore, connectorClientConfigOverridePolicy);
this.time = time;
this.herderMetrics = new HerderMetrics(metrics);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
index 2312269..c78026f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
-import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,13 +70,13 @@ public class DeadLetterQueueReporter implements ErrorReporter {
private KafkaProducer<byte[], byte[]> kafkaProducer;
- public static DeadLetterQueueReporter createAndSetup(WorkerConfig workerConfig,
+ public static DeadLetterQueueReporter createAndSetup(Map<String, Object> adminProps,
ConnectorTaskId id,
SinkConnectorConfig sinkConfig, Map<String, Object> producerProps,
ErrorHandlingMetrics errorHandlingMetrics) {
String topic = sinkConfig.dlqTopicName();
- try (AdminClient admin = AdminClient.create(workerConfig.originals())) {
+ try (AdminClient admin = AdminClient.create(adminProps)) {
if (!admin.listTopics().names().get().contains(topic)) {
log.error("Topic {} doesn't exist. Will attempt to create topic.", topic);
NewTopic schemaTopicRequest = new NewTopic(topic, DLQ_NUM_DESIRED_PARTITIONS, sinkConfig.dlqTopicReplicationFactor());
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index 460df39..d8c4cca 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime.isolation;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
@@ -72,6 +73,7 @@ public class DelegatingClassLoader extends URLClassLoader {
private final SortedSet<PluginDesc<Transformation>> transformations;
private final SortedSet<PluginDesc<ConfigProvider>> configProviders;
private final SortedSet<PluginDesc<ConnectRestExtension>> restExtensions;
+ private final SortedSet<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies;
private final List<String> pluginPaths;
private static final String MANIFEST_PREFIX = "META-INF/services/";
@@ -91,6 +93,7 @@ public class DelegatingClassLoader extends URLClassLoader {
this.transformations = new TreeSet<>();
this.configProviders = new TreeSet<>();
this.restExtensions = new TreeSet<>();
+ this.connectorClientConfigPolicies = new TreeSet<>();
}
public DelegatingClassLoader(List<String> pluginPaths) {
@@ -125,6 +128,10 @@ public class DelegatingClassLoader extends URLClassLoader {
return restExtensions;
}
+ public Set<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies() {
+ return connectorClientConfigPolicies;
+ }
+
public ClassLoader connectorLoader(Connector connector) {
return connectorLoader(connector.getClass().getName());
}
@@ -249,6 +256,8 @@ public class DelegatingClassLoader extends URLClassLoader {
configProviders.addAll(plugins.configProviders());
addPlugins(plugins.restExtensions(), loader);
restExtensions.addAll(plugins.restExtensions());
+ addPlugins(plugins.connectorClientConfigPolicies(), loader);
+ connectorClientConfigPolicies.addAll(plugins.connectorClientConfigPolicies());
}
loadJdbcDrivers(loader);
@@ -304,7 +313,8 @@ public class DelegatingClassLoader extends URLClassLoader {
getPluginDesc(reflections, HeaderConverter.class, loader),
getPluginDesc(reflections, Transformation.class, loader),
getServiceLoaderPluginDesc(ConfigProvider.class, loader),
- getServiceLoaderPluginDesc(ConnectRestExtension.class, loader)
+ getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
+ getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
);
}
@@ -371,6 +381,7 @@ public class DelegatingClassLoader extends URLClassLoader {
addAliases(headerConverters);
addAliases(transformations);
addAliases(restExtensions);
+ addAliases(connectorClientConfigPolicies);
}
private <S> void addAliases(Collection<PluginDesc<S>> plugins) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
index ef077b3..e64a96c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
@@ -18,12 +18,15 @@ package org.apache.kafka.connect.runtime.isolation;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.Transformation;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.List;
public class PluginScanResult {
private final Collection<PluginDesc<Connector>> connectors;
@@ -32,6 +35,9 @@ public class PluginScanResult {
private final Collection<PluginDesc<Transformation>> transformations;
private final Collection<PluginDesc<ConfigProvider>> configProviders;
private final Collection<PluginDesc<ConnectRestExtension>> restExtensions;
+ private final Collection<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies;
+
+ private final List<Collection> allPlugins;
public PluginScanResult(
Collection<PluginDesc<Connector>> connectors,
@@ -39,7 +45,8 @@ public class PluginScanResult {
Collection<PluginDesc<HeaderConverter>> headerConverters,
Collection<PluginDesc<Transformation>> transformations,
Collection<PluginDesc<ConfigProvider>> configProviders,
- Collection<PluginDesc<ConnectRestExtension>> restExtensions
+ Collection<PluginDesc<ConnectRestExtension>> restExtensions,
+ Collection<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies
) {
this.connectors = connectors;
this.converters = converters;
@@ -47,6 +54,10 @@ public class PluginScanResult {
this.transformations = transformations;
this.configProviders = configProviders;
this.restExtensions = restExtensions;
+ this.connectorClientConfigPolicies = connectorClientConfigPolicies;
+ this.allPlugins =
+ Arrays.asList(connectors, converters, headerConverters, transformations, configProviders,
+ connectorClientConfigPolicies);
}
public Collection<PluginDesc<Connector>> connectors() {
@@ -73,12 +84,15 @@ public class PluginScanResult {
return restExtensions;
}
+ public Collection<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies() {
+ return connectorClientConfigPolicies;
+ }
+
public boolean isEmpty() {
- return connectors().isEmpty()
- && converters().isEmpty()
- && headerConverters().isEmpty()
- && transformations().isEmpty()
- && configProviders().isEmpty()
- && restExtensions().isEmpty();
+ boolean isEmpty = true;
+ for (Collection plugins : allPlugins) {
+ isEmpty = isEmpty && plugins.isEmpty();
+ }
+ return isEmpty;
}
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
index 2833b4c..8b42f59 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.isolation;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
@@ -34,6 +35,7 @@ public enum PluginType {
TRANSFORMATION(Transformation.class),
CONFIGPROVIDER(ConfigProvider.class),
REST_EXTENSION(ConnectRestExtension.class),
+ CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY(ConnectorClientConfigOverridePolicy.class),
UNKNOWN(Object.class);
private Class<?> klass;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index 8d2a3ce..c029802 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -131,6 +131,7 @@ public class PluginUtils {
+ "|storage\\.StringConverter"
+ "|storage\\.SimpleHeaderConverter"
+ "|rest\\.basic\\.auth\\.extension\\.BasicAuthSecurityRestExtension"
+ + "|connector\\.policy\\..*"
+ ")"
+ "|common\\.config\\.provider\\.(?!ConfigProvider$).*"
+ ")$");
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 148f818..e438db8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -149,6 +149,11 @@ public class Plugins {
}
public Connector newConnector(String connectorClassOrAlias) {
+ Class<? extends Connector> klass = connectorClass(connectorClassOrAlias);
+ return newPlugin(klass);
+ }
+
+ public Class<? extends Connector> connectorClass(String connectorClassOrAlias) {
Class<? extends Connector> klass;
try {
klass = pluginClass(
@@ -188,7 +193,7 @@ public class Plugins {
PluginDesc<Connector> entry = matches.get(0);
klass = entry.pluginClass();
}
- return newPlugin(klass);
+ return klass;
}
public Task newTask(Class<? extends Task> taskClass) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 7b6d16a..00bda92 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.runtime.standalone;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
@@ -62,12 +63,14 @@ public class StandaloneHerder extends AbstractHerder {
private ClusterConfigState configState;
- public StandaloneHerder(Worker worker, String kafkaClusterId) {
+ public StandaloneHerder(Worker worker, String kafkaClusterId,
+ ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
this(worker,
worker.workerId(),
kafkaClusterId,
new MemoryStatusBackingStore(),
- new MemoryConfigBackingStore(worker.configTransformer()));
+ new MemoryConfigBackingStore(worker.configTransformer()),
+ connectorClientConfigOverridePolicy);
}
// visible for testing
@@ -75,8 +78,9 @@ public class StandaloneHerder extends AbstractHerder {
String workerId,
String kafkaClusterId,
StatusBackingStore statusBackingStore,
- MemoryConfigBackingStore configBackingStore) {
- super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore);
+ MemoryConfigBackingStore configBackingStore,
+ ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+ super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore, connectorClientConfigOverridePolicy);
this.configState = ClusterConfigState.EMPTY;
this.requestExecutorService = Executors.newSingleThreadScheduledExecutor();
configBackingStore.setUpdateListener(new ConfigUpdateListener());
diff --git a/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy
new file mode 100644
index 0000000..8b76ce4
--- /dev/null
+++ b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy
@@ -0,0 +1,18 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ # http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy
+org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy
+org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy
\ No newline at end of file
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/BaseConnectorClientConfigOverridePolicyTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/BaseConnectorClientConfigOverridePolicyTest.java
new file mode 100644
index 0000000..28fee73
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/BaseConnectorClientConfigOverridePolicyTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.connector.policy;
+
+import org.apache.kafka.common.config.ConfigValue;
+import org.apache.kafka.connect.health.ConnectorType;
+import org.apache.kafka.connect.runtime.WorkerTest;
+import org.junit.Assert;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class BaseConnectorClientConfigOverridePolicyTest {
+
+ protected abstract ConnectorClientConfigOverridePolicy policyToTest();
+
+ protected void testValidOverride(Map<String, Object> clientConfig) {
+ List<ConfigValue> configValues = configValues(clientConfig);
+ assertNoError(configValues);
+ }
+
+ protected void testInvalidOverride(Map<String, Object> clientConfig) {
+ List<ConfigValue> configValues = configValues(clientConfig);
+ assertError(configValues);
+ }
+
+ private List<ConfigValue> configValues(Map<String, Object> clientConfig) {
+ ConnectorClientConfigRequest connectorClientConfigRequest = new ConnectorClientConfigRequest(
+ "test",
+ ConnectorType.SOURCE,
+ WorkerTest.WorkerTestConnector.class,
+ clientConfig,
+ ConnectorClientConfigRequest.ClientType.PRODUCER);
+ return policyToTest().validate(connectorClientConfigRequest);
+ }
+
+ protected void assertNoError(List<ConfigValue> configValues) {
+ Assert.assertTrue(configValues.stream().allMatch(configValue -> configValue.errorMessages().size() == 0));
+ }
+
+ protected void assertError(List<ConfigValue> configValues) {
+ Assert.assertTrue(configValues.stream().anyMatch(configValue -> configValue.errorMessages().size() > 0));
+ }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/NoneConnectorClientConfigOverridePolicyTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/NoneConnectorClientConfigOverridePolicyTest.java
new file mode 100644
index 0000000..2c7b078
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/NoneConnectorClientConfigOverridePolicyTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.connector.policy;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class NoneConnectorClientConfigOverridePolicyTest extends BaseConnectorClientConfigOverridePolicyTest {
+
+ ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy();
+
+ @Test
+ public void testNoOverrides() {
+ testValidOverride(Collections.emptyMap());
+ }
+
+ @Test
+ public void testWithOverrides() {
+ Map<String, Object> clientConfig = new HashMap<>();
+ clientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, "test");
+ clientConfig.put(ProducerConfig.ACKS_CONFIG, "none");
+ testInvalidOverride(clientConfig);
+ }
+
+ @Override
+ protected ConnectorClientConfigOverridePolicy policyToTest() {
+ return noneConnectorClientConfigOverridePolicy;
+ }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicyTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicyTest.java
new file mode 100644
index 0000000..0e79c8a
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicyTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.connector.policy;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PrincipalConnectorClientConfigOverridePolicyTest extends BaseConnectorClientConfigOverridePolicyTest {
+
+ ConnectorClientConfigOverridePolicy principalConnectorClientConfigOverridePolicy = new PrincipalConnectorClientConfigOverridePolicy();
+
+ @Test
+ public void testPrincipalOnly() {
+ Map<String, Object> clientConfig = Collections.singletonMap(SaslConfigs.SASL_JAAS_CONFIG, "test");
+ testValidOverride(clientConfig);
+ }
+
+ @Test
+ public void testPrincipalPlusOtherConfigs() {
+ Map<String, Object> clientConfig = new HashMap<>();
+ clientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, "test");
+ clientConfig.put(ProducerConfig.ACKS_CONFIG, "none");
+ testInvalidOverride(clientConfig);
+ }
+
+ @Override
+ protected ConnectorClientConfigOverridePolicy policyToTest() {
+ return principalConnectorClientConfigOverridePolicy;
+ }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorCientPolicyIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorCientPolicyIntegrationTest.java
new file mode 100644
index 0000000..499916b
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorCientPolicyIntegrationTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@Category(IntegrationTest.class)
+public class ConnectorCientPolicyIntegrationTest {
+
+ private static final int NUM_TASKS = 1;
+ private static final int NUM_WORKERS = 1;
+ private static final String CONNECTOR_NAME = "simple-conn";
+
+
+ @After
+ public void close() {
+ }
+
+ @Test
+ public void testCreateWithOverridesForNonePolicy() throws Exception {
+ Map<String, String> props = basicConnectorConfig();
+ props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + SaslConfigs.SASL_JAAS_CONFIG, "sasl");
+ assertFailCreateConnector("None", props);
+ }
+
+ @Test
+ public void testCreateWithNotAllowedOverridesForPrincipalPolicy() throws Exception {
+ Map<String, String> props = basicConnectorConfig();
+ props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + SaslConfigs.SASL_JAAS_CONFIG, "sasl");
+ props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+ assertFailCreateConnector("Principal", props);
+ }
+
+ @Test
+ public void testCreateWithAllowedOverridesForPrincipalPolicy() throws Exception {
+ Map<String, String> props = basicConnectorConfig();
+ props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAIN");
+ assertPassCreateConnector("Principal", props);
+ }
+
+ @Test
+ public void testCreateWithAllowedOverridesForAllPolicy() throws Exception {
+ // setup up props for the sink connector
+ Map<String, String> props = basicConnectorConfig();
+ props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.CLIENT_ID_CONFIG, "test");
+ assertPassCreateConnector("All", props);
+ }
+
+ private EmbeddedConnectCluster connectClusterWithPolicy(String policy) throws IOException {
+ // setup Connect worker properties
+ Map<String, String> workerProps = new HashMap<>();
+ workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000));
+ workerProps.put(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, policy);
+
+ // setup Kafka broker properties
+ Properties exampleBrokerProps = new Properties();
+ exampleBrokerProps.put("auto.create.topics.enable", "false");
+
+ // build a Connect cluster backed by Kafka and Zk
+ EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder()
+ .name("connect-cluster")
+ .numWorkers(NUM_WORKERS)
+ .numBrokers(1)
+ .workerProps(workerProps)
+ .brokerProps(exampleBrokerProps)
+ .build();
+
+ // start the clusters
+ connect.start();
+ return connect;
+ }
+
+ private void assertFailCreateConnector(String policy, Map<String, String> props) throws IOException {
+ EmbeddedConnectCluster connect = connectClusterWithPolicy(policy);
+ try {
+ connect.configureConnector(CONNECTOR_NAME, props);
+ fail("Shouldn't be able to create connector");
+ } catch (ConnectRestException e) {
+ assertEquals(e.statusCode(), 400);
+ } finally {
+ connect.stop();
+ }
+ }
+
+ private void assertPassCreateConnector(String policy, Map<String, String> props) throws IOException {
+ EmbeddedConnectCluster connect = connectClusterWithPolicy(policy);
+ try {
+ connect.configureConnector(CONNECTOR_NAME, props);
+ } catch (ConnectRestException e) {
+ fail("Should be able to create connector");
+ } finally {
+ connect.stop();
+ }
+ }
+
+
+ public Map<String, String> basicConnectorConfig() {
+ Map<String, String> props = new HashMap<>();
+ props.put(CONNECTOR_CLASS_CONFIG, MonitorableSinkConnector.class.getSimpleName());
+ props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+ props.put(TOPICS_CONFIG, "test-topic");
+ props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ return props;
+ }
+
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index f7ee8a6..35c0dd2 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -16,10 +16,15 @@
*/
package org.apache.kafka.connect.runtime;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.Plugins;
@@ -116,6 +121,7 @@ public class AbstractHerderTest {
private final String kafkaClusterId = "I4ZmrWqfT2e-upky_4fdPA";
private final int generation = 5;
private final String connector = "connector";
+ private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy();
@MockStrict private Worker worker;
@MockStrict private WorkerConfigTransformer transformer;
@@ -132,9 +138,10 @@ public class AbstractHerderTest {
String.class,
String.class,
StatusBackingStore.class,
- ConfigBackingStore.class
+ ConfigBackingStore.class,
+ ConnectorClientConfigOverridePolicy.class
)
- .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore)
+ .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.addMockedMethod("generation")
.createMock();
@@ -155,9 +162,10 @@ public class AbstractHerderTest {
String.class,
String.class,
StatusBackingStore.class,
- ConfigBackingStore.class
+ ConfigBackingStore.class,
+ ConnectorClientConfigOverridePolicy.class
)
- .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore)
+ .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.addMockedMethod("generation")
.createMock();
@@ -179,8 +187,9 @@ public class AbstractHerderTest {
ConnectorTaskId taskId = new ConnectorTaskId(connector, 0);
AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
- .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
- .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore)
+ .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class,
+ ConnectorClientConfigOverridePolicy.class)
+ .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.addMockedMethod("generation")
.createMock();
@@ -219,8 +228,9 @@ public class AbstractHerderTest {
String workerId = "workerId";
AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
- .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
- .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore)
+ .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class,
+ ConnectorClientConfigOverridePolicy.class)
+ .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.addMockedMethod("generation")
.createMock();
@@ -253,7 +263,7 @@ public class AbstractHerderTest {
@Test(expected = BadRequestException.class)
public void testConfigValidationEmptyConfig() {
- AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class);
+ AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class, noneConnectorClientConfigOverridePolicy);
replayAll();
herder.validateConnectorConfig(new HashMap<String, String>());
@@ -263,7 +273,7 @@ public class AbstractHerderTest {
@Test()
public void testConfigValidationMissingName() {
- AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class);
+ AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class, noneConnectorClientConfigOverridePolicy);
replayAll();
Map<String, String> config = Collections.singletonMap(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSourceConnector.class.getName());
@@ -288,7 +298,7 @@ public class AbstractHerderTest {
@Test(expected = ConfigException.class)
public void testConfigValidationInvalidTopics() {
- AbstractHerder herder = createConfigValidationHerder(TestSinkConnector.class);
+ AbstractHerder herder = createConfigValidationHerder(TestSinkConnector.class, noneConnectorClientConfigOverridePolicy);
replayAll();
Map<String, String> config = new HashMap<>();
@@ -303,7 +313,7 @@ public class AbstractHerderTest {
@Test()
public void testConfigValidationTransformsExtendResults() {
- AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class);
+ AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class, noneConnectorClientConfigOverridePolicy);
// 2 transform aliases defined -> 2 plugin lookups
Set<PluginDesc<Transformation>> transformations = new HashSet<>();
@@ -349,6 +359,46 @@ public class AbstractHerderTest {
verifyAll();
}
+ @Test()
+ public void testConfigValidationPrincipalOnlyOverride() {
+ AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class, new PrincipalConnectorClientConfigOverridePolicy());
+ replayAll();
+
+ // Define 2 transformations. One has a class defined and so can get embedded configs, the other is missing
+ // class info that should generate an error.
+ Map<String, String> config = new HashMap<>();
+ config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSourceConnector.class.getName());
+ config.put(ConnectorConfig.NAME_CONFIG, "connector-name");
+ config.put("required", "value"); // connector required config
+ String ackConfigKey = ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + ProducerConfig.ACKS_CONFIG;
+ String saslConfigKey = ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + SaslConfigs.SASL_JAAS_CONFIG;
+ config.put(ackConfigKey, "none");
+ config.put(saslConfigKey, "jaas_config");
+
+ ConfigInfos result = herder.validateConnectorConfig(config);
+ assertEquals(herder.connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)), ConnectorType.SOURCE);
+
+ // We expect there to be errors due to now allowed override policy for ACKS.... Note that these assertions depend heavily on
+ // the config fields for SourceConnectorConfig, but we expect these to change rarely.
+ assertEquals(TestSourceConnector.class.getName(), result.name());
+ // Each transform also gets its own group
+ List<String> expectedGroups = Arrays.asList(
+ ConnectorConfig.COMMON_GROUP,
+ ConnectorConfig.TRANSFORMS_GROUP,
+ ConnectorConfig.ERROR_GROUP
+ );
+ assertEquals(expectedGroups, result.groups());
+ assertEquals(1, result.errorCount());
+ // Base connector config has 13 fields, connector's configs add 2, and 2 producer overrides
+ assertEquals(17, result.values().size());
+ assertTrue(result.values().stream().anyMatch(
+ configInfo -> ackConfigKey.equals(configInfo.configValue().name()) && !configInfo.configValue().errors().isEmpty()));
+ assertTrue(result.values().stream().anyMatch(
+ configInfo -> saslConfigKey.equals(configInfo.configValue().name()) && configInfo.configValue().errors().isEmpty()));
+
+ verifyAll();
+ }
+
@Test
public void testReverseTransformConfigs() {
// Construct a task config with constant values for TEST_KEY and TEST_KEY2
@@ -372,15 +422,17 @@ public class AbstractHerderTest {
assertFalse(reverseTransformed.get(0).containsKey(TEST_KEY3));
}
- private AbstractHerder createConfigValidationHerder(Class<? extends Connector> connectorClass) {
+ private AbstractHerder createConfigValidationHerder(Class<? extends Connector> connectorClass,
+ ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
ConfigBackingStore configStore = strictMock(ConfigBackingStore.class);
StatusBackingStore statusStore = strictMock(StatusBackingStore.class);
AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
- .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
- .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore)
+ .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class,
+ ConnectorClientConfigOverridePolicy.class)
+ .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, connectorClientConfigOverridePolicy)
.addMockedMethod("generation")
.createMock();
EasyMock.expect(herder.generation()).andStubReturn(generation);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 586587b..9cb83eb 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -29,6 +29,9 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
@@ -60,6 +63,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.api.easymock.annotation.MockNice;
import org.powermock.api.easymock.annotation.MockStrict;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -89,6 +93,8 @@ public class WorkerTest extends ThreadedTest {
private static final String CONNECTOR_ID = "test-connector";
private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 0);
private static final String WORKER_ID = "localhost:8083";
+ private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy();
+ private final ConnectorClientConfigOverridePolicy allConnectorClientConfigOverridePolicy = new AllConnectorClientConfigOverridePolicy();
private Map<String, String> workerProps = new HashMap<>();
private WorkerConfig config;
@@ -120,6 +126,7 @@ public class WorkerTest extends ThreadedTest {
@Mock private Converter taskValueConverter;
@Mock private HeaderConverter taskHeaderConverter;
@Mock private ExecutorService executorService;
+ @MockNice private ConnectorConfig connectorConfig;
@Before
public void setup() {
@@ -200,7 +207,7 @@ public class WorkerTest extends ThreadedTest {
PowerMock.replayAll();
- worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
worker.start();
assertEquals(Collections.emptySet(), worker.connectorNames());
@@ -251,7 +258,7 @@ public class WorkerTest extends ThreadedTest {
PowerMock.replayAll();
- worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
worker.start();
assertStatistics(worker, 0, 0);
@@ -311,7 +318,7 @@ public class WorkerTest extends ThreadedTest {
PowerMock.replayAll();
- worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
worker.start();
assertStatistics(worker, 0, 0);
@@ -375,7 +382,7 @@ public class WorkerTest extends ThreadedTest {
PowerMock.replayAll();
- worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
worker.start();
assertStatistics(worker, 0, 0);
@@ -401,7 +408,7 @@ public class WorkerTest extends ThreadedTest {
PowerMock.replayAll();
- worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
worker.start();
worker.stopConnector(CONNECTOR_ID);
@@ -458,7 +465,7 @@ public class WorkerTest extends ThreadedTest {
PowerMock.replayAll();
- worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
worker.start();
assertStatistics(worker, 0, 0);
@@ -550,7 +557,6 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader);
EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName()))
.andReturn(pluginLoader);
-
EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader)).andReturn(delegatingLoader)
.times(2);
@@ -558,7 +564,8 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader)
.times(2);
-
+ plugins.connectorClass(WorkerTestConnector.class.getName());
+ EasyMock.expectLastCall().andReturn(WorkerTestConnector.class);
// Remove
workerTask.stop();
EasyMock.expectLastCall();
@@ -569,7 +576,8 @@ public class WorkerTest extends ThreadedTest {
PowerMock.replayAll();
- worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService);
+ worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService,
+ noneConnectorClientConfigOverridePolicy);
worker.start();
assertStatistics(worker, 0, 0);
assertStartupStatistics(worker, 0, 0, 0, 0);
@@ -620,7 +628,7 @@ public class WorkerTest extends ThreadedTest {
PowerMock.replayAll();
- worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
worker.start();
assertStatistics(worker, 0, 0);
assertStartupStatistics(worker, 0, 0, 0, 0);
@@ -699,7 +707,8 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader)
.times(2);
-
+ plugins.connectorClass(WorkerTestConnector.class.getName());
+ EasyMock.expectLastCall().andReturn(WorkerTestConnector.class);
// Remove on Worker.stop()
workerTask.stop();
EasyMock.expectLastCall();
@@ -712,7 +721,8 @@ public class WorkerTest extends ThreadedTest {
PowerMock.replayAll();
- worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService);
+ worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService,
+ noneConnectorClientConfigOverridePolicy);
worker.start();
assertStatistics(worker, 0, 0);
worker.startTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
@@ -791,6 +801,8 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader)
.times(2);
+ plugins.connectorClass(WorkerTestConnector.class.getName());
+ EasyMock.expectLastCall().andReturn(WorkerTestConnector.class);
// Remove
workerTask.stop();
@@ -802,7 +814,8 @@ public class WorkerTest extends ThreadedTest {
PowerMock.replayAll();
- worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService);
+ worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService,
+ noneConnectorClientConfigOverridePolicy);
worker.start();
assertStatistics(worker, 0, 0);
assertEquals(Collections.emptySet(), worker.taskIds());
@@ -828,9 +841,13 @@ public class WorkerTest extends ThreadedTest {
@Test
public void testProducerConfigsWithoutOverrides() {
+ EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX)).andReturn(
+ new HashMap<String, Object>());
+ PowerMock.replayAll();
Map<String, String> expectedConfigs = new HashMap<>(defaultProducerConfigs);
expectedConfigs.put("client.id", "connector-producer-job-0");
- assertEquals(expectedConfigs, Worker.producerConfigs("connector-producer-" + TASK_ID, config));
+ assertEquals(expectedConfigs,
+ Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, config, connectorConfig, null, noneConnectorClientConfigOverridePolicy));
}
@Test
@@ -845,7 +862,34 @@ public class WorkerTest extends ThreadedTest {
expectedConfigs.put("acks", "-1");
expectedConfigs.put("linger.ms", "1000");
expectedConfigs.put("client.id", "producer-test-id");
- assertEquals(expectedConfigs, Worker.producerConfigs("connector-producer-" + TASK_ID, configWithOverrides));
+ EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX)).andReturn(
+ new HashMap<String, Object>());
+ PowerMock.replayAll();
+ assertEquals(expectedConfigs,
+ Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, configWithOverrides, connectorConfig, null, allConnectorClientConfigOverridePolicy));
+ }
+
+ @Test
+ public void testProducerConfigsWithClientOverrides() {
+ Map<String, String> props = new HashMap<>(workerProps);
+ props.put("producer.acks", "-1");
+ props.put("producer.linger.ms", "1000");
+ props.put("producer.client.id", "producer-test-id");
+ WorkerConfig configWithOverrides = new StandaloneConfig(props);
+
+ Map<String, String> expectedConfigs = new HashMap<>(defaultProducerConfigs);
+ expectedConfigs.put("acks", "-1");
+ expectedConfigs.put("linger.ms", "5000");
+ expectedConfigs.put("batch.size", "1000");
+ expectedConfigs.put("client.id", "producer-test-id");
+ Map<String, Object> connConfig = new HashMap<String, Object>();
+ connConfig.put("linger.ms", "5000");
+ connConfig.put("batch.size", "1000");
+ EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX))
+ .andReturn(connConfig);
+ PowerMock.replayAll();
+ assertEquals(expectedConfigs,
+ Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, configWithOverrides, connectorConfig, null, allConnectorClientConfigOverridePolicy));
}
@Test
@@ -853,7 +897,10 @@ public class WorkerTest extends ThreadedTest {
Map<String, String> expectedConfigs = new HashMap<>(defaultConsumerConfigs);
expectedConfigs.put("group.id", "connect-test");
expectedConfigs.put("client.id", "connector-consumer-test-1");
- assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), config));
+ EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).andReturn(new HashMap<>());
+ PowerMock.replayAll();
+ assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), config, connectorConfig,
+ null, noneConnectorClientConfigOverridePolicy));
}
@Test
@@ -869,9 +916,95 @@ public class WorkerTest extends ThreadedTest {
expectedConfigs.put("auto.offset.reset", "latest");
expectedConfigs.put("max.poll.records", "1000");
expectedConfigs.put("client.id", "consumer-test-id");
- assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), configWithOverrides));
+ EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).andReturn(new HashMap<>());
+ PowerMock.replayAll();
+ assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
+ null, noneConnectorClientConfigOverridePolicy));
+
}
+ @Test
+ public void testConsumerConfigsWithClientOverrides() {
+ Map<String, String> props = new HashMap<>(workerProps);
+ props.put("consumer.auto.offset.reset", "latest");
+ props.put("consumer.max.poll.records", "5000");
+ WorkerConfig configWithOverrides = new StandaloneConfig(props);
+
+ Map<String, String> expectedConfigs = new HashMap<>(defaultConsumerConfigs);
+ expectedConfigs.put("group.id", "connect-test");
+ expectedConfigs.put("auto.offset.reset", "latest");
+ expectedConfigs.put("max.poll.records", "5000");
+ expectedConfigs.put("max.poll.interval.ms", "1000");
+ expectedConfigs.put("client.id", "connector-consumer-test-1");
+ Map<String, Object> connConfig = new HashMap<String, Object>();
+ connConfig.put("max.poll.records", "5000");
+ connConfig.put("max.poll.interval.ms", "1000");
+ EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX))
+ .andReturn(connConfig);
+ PowerMock.replayAll();
+ assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
+ null, allConnectorClientConfigOverridePolicy));
+ }
+
+ @Test(expected = ConnectException.class)
+ public void testConsumerConfigsClientOverridesWithNonePolicy() {
+ Map<String, String> props = new HashMap<>(workerProps);
+ props.put("consumer.auto.offset.reset", "latest");
+ props.put("consumer.max.poll.records", "5000");
+ WorkerConfig configWithOverrides = new StandaloneConfig(props);
+
+ Map<String, Object> connConfig = new HashMap<String, Object>();
+ connConfig.put("max.poll.records", "5000");
+ connConfig.put("max.poll.interval.ms", "1000");
+ EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX))
+ .andReturn(connConfig);
+ PowerMock.replayAll();
+ Worker.consumerConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
+ null, noneConnectorClientConfigOverridePolicy);
+ }
+
+ @Test
+ public void testAdminConfigsClientOverridesWithAllPolicy() {
+ Map<String, String> props = new HashMap<>(workerProps);
+ props.put("admin.client.id", "testid");
+ props.put("admin.metadata.max.age.ms", "5000");
+ WorkerConfig configWithOverrides = new StandaloneConfig(props);
+
+ Map<String, Object> connConfig = new HashMap<String, Object>();
+ connConfig.put("metadata.max.age.ms", "10000");
+
+ Map<String, String> expectedConfigs = new HashMap<>();
+ expectedConfigs.put("bootstrap.servers", "localhost:9092");
+ expectedConfigs.put("client.id", "testid");
+ expectedConfigs.put("metadata.max.age.ms", "10000");
+
+ EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX))
+ .andReturn(connConfig);
+ PowerMock.replayAll();
+ assertEquals(expectedConfigs, Worker.adminConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
+ null, allConnectorClientConfigOverridePolicy));
+
+ }
+
+ @Test(expected = ConnectException.class)
+ public void testAdminConfigsClientOverridesWithNonePolicy() {
+ Map<String, String> props = new HashMap<>(workerProps);
+ props.put("admin.client.id", "testid");
+ props.put("admin.metadata.max.age.ms", "5000");
+ WorkerConfig configWithOverrides = new StandaloneConfig(props);
+
+ Map<String, Object> connConfig = new HashMap<String, Object>();
+ connConfig.put("metadata.max.age.ms", "10000");
+
+ EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX))
+ .andReturn(connConfig);
+ PowerMock.replayAll();
+ Worker.adminConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
+ null, noneConnectorClientConfigOverridePolicy);
+
+ }
+
+
private void assertStatistics(Worker worker, int connectors, int tasks) {
MetricGroup workerMetrics = worker.workerMetricsGroup().metricGroup();
assertEquals(connectors, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "connector-count"), 0.0001d);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index b381263..b03ddf3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -23,6 +23,8 @@ import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
@@ -177,6 +179,8 @@ public class DistributedHerderTest {
private SinkConnectorConfig conn1SinkConfig;
private SinkConnectorConfig conn1SinkConfigUpdated;
private short connectProtocolVersion;
+ private final ConnectorClientConfigOverridePolicy
+ noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy();
@Before
public void setUp() throws Exception {
@@ -191,7 +195,7 @@ public class DistributedHerderTest {
herder = PowerMock.createPartialMock(DistributedHerder.class,
new String[]{"backoff", "connectorTypeForClass", "updateDeletedConnectorStatus"},
new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, KAFKA_CLUSTER_ID,
- statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time);
+ statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time, noneConnectorClientConfigOverridePolicy);
configUpdateListener = herder.new ConfigUpdateListener();
rebalanceListener = herder.new RebalanceListener(time);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 8aa1c70..e020cee 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
@@ -111,11 +113,15 @@ public class StandaloneHerderTest {
@Mock protected Callback<Herder.Created<ConnectorInfo>> createCallback;
@Mock protected StatusBackingStore statusBackingStore;
+ private final ConnectorClientConfigOverridePolicy
+ noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy();
+
+
@Before
public void setup() {
worker = PowerMock.createMock(Worker.class);
herder = PowerMock.createPartialMock(StandaloneHerder.class, new String[]{"connectorTypeForClass"},
- worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new MemoryConfigBackingStore(transformer));
+ worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new MemoryConfigBackingStore(transformer), noneConnectorClientConfigOverridePolicy);
plugins = PowerMock.createMock(Plugins.class);
pluginLoader = PowerMock.createMock(PluginClassLoader.class);
delegatingLoader = PowerMock.createMock(DelegatingClassLoader.class);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index e610812..07c2755 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -362,13 +362,14 @@ public class EmbeddedConnectCluster {
try (OutputStreamWriter out = new OutputStreamWriter(httpCon.getOutputStream())) {
out.write(body);
}
- try (InputStream is = httpCon.getInputStream()) {
- int c;
- StringBuilder response = new StringBuilder();
- while ((c = is.read()) != -1) {
- response.append((char) c);
+ if (httpCon.getResponseCode() < HttpURLConnection.HTTP_BAD_REQUEST) {
+ try (InputStream is = httpCon.getInputStream()) {
+ log.info("PUT response for URL={} is {}", url, responseToString(is));
+ }
+ } else {
+ try (InputStream is = httpCon.getErrorStream()) {
+ log.info("PUT error response for URL={} is {}", url, responseToString(is));
}
- log.info("Put response for URL={} is {}", url, response);
}
return httpCon.getResponseCode();
}
@@ -392,7 +393,7 @@ public class EmbeddedConnectCluster {
while ((c = is.read()) != -1) {
response.append((char) c);
}
- log.debug("Get response for URL={} is {}", url, response);
+ log.debug("GET response for URL={} is {}", url, response);
return response.toString();
} catch (IOException e) {
Response.Status status = Response.Status.fromStatusCode(httpCon.getResponseCode());
@@ -413,6 +414,15 @@ public class EmbeddedConnectCluster {
return httpCon.getResponseCode();
}
+ private String responseToString(InputStream stream) throws IOException {
+ int c;
+ StringBuilder response = new StringBuilder();
+ while ((c = stream.read()) != -1) {
+ response.append((char) c);
+ }
+ return response.toString();
+ }
+
public static class Builder {
private String name = UUID.randomUUID().toString();
private Map<String, String> workerProps = new HashMap<>();