You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/05/22 20:56:02 UTC
[kafka] branch trunk updated: KAFKA-5807 - Check Connector.config()
and Transformation.config() returns a valid ConfigDef
This is an automated email from the ASF dual-hosted git repository.
ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7ecfbab KAFKA-5807 - Check Connector.config() and Transformation.config() returns a valid ConfigDef
7ecfbab is described below
commit 7ecfbab92fa0993a3046ff0c3d4fc69f319f13a3
Author: Jeremy Custenborder <jc...@gmail.com>
AuthorDate: Tue May 22 13:53:15 2018 -0700
KAFKA-5807 - Check Connector.config() and Transformation.config() returns a valid ConfigDef
Little back story on this. Was helping a user over email. This could be much easier to debug if we assume that the connector developer might not return valid configs. For example Intellij will generate a stub that returns a null. This was the case that inspired this JIRA.
Author: Jeremy Custenborder <jc...@gmail.com>
Reviewers: Jason Gustafson <ja...@confluent.io>, Randall Hauch <rh...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #3762 from jcustenborder/KAFKA-5807
---
.../org/apache/kafka/connect/connector/Connector.java | 8 +++++++-
.../org/apache/kafka/connect/runtime/AbstractHerder.java | 16 ++++++++++++++++
.../apache/kafka/connect/runtime/ConnectorConfig.java | 13 ++++++++++++-
3 files changed, 35 insertions(+), 2 deletions(-)
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
index a8a5dab..30dfd3c 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.connector;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
+import org.apache.kafka.connect.errors.ConnectException;
import java.util.List;
import java.util.Map;
@@ -130,13 +131,18 @@ public abstract class Connector {
*/
public Config validate(Map<String, String> connectorConfigs) {
ConfigDef configDef = config();
+ if (null == configDef) {
+ throw new ConnectException(
+ String.format("%s.config() must return a ConfigDef that is not null.", this.getClass().getName())
+ );
+ }
List<ConfigValue> configValues = configDef.validate(connectorConfigs);
return new Config(configValues);
}
/**
* Define the configuration for the connector.
- * @return The ConfigDef for this connector.
+ * @return The ConfigDef for this connector; may not be null.
*/
public abstract ConfigDef config();
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 424b474..c315686 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -271,7 +271,23 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
// do custom connector-specific validation
Config config = connector.validate(connectorProps);
+ if (null == config) {
+ throw new BadRequestException(
+ String.format(
+ "%s.validate() must return a Config that is not null.",
+ connector.getClass().getName()
+ )
+ );
+ }
ConfigDef configDef = connector.config();
+ if (null == configDef) {
+ throw new BadRequestException(
+ String.format(
+ "%s.config() must return a ConfigDef that is not null.",
+ connector.getClass().getName()
+ )
+ );
+ }
configKeys.putAll(configDef.configKeys());
allGroups.addAll(configDef.groups());
configValues.addAll(config.configValues());
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 6a90310..a8dd49a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -227,11 +227,22 @@ public class ConnectorConfig extends AbstractConfig {
if (transformationCls == null || !Transformation.class.isAssignableFrom(transformationCls)) {
throw new ConfigException(key, String.valueOf(transformationCls), "Not a Transformation");
}
+ Transformation transformation;
try {
- return (transformationCls.asSubclass(Transformation.class).newInstance()).config();
+ transformation = transformationCls.asSubclass(Transformation.class).newInstance();
} catch (Exception e) {
throw new ConfigException(key, String.valueOf(transformationCls), "Error getting config definition from Transformation: " + e.getMessage());
}
+ ConfigDef configDef = transformation.config();
+ if (null == configDef) {
+ throw new ConnectException(
+ String.format(
+ "%s.config() must return a ConfigDef that is not null.",
+ transformationCls.getName()
+ )
+ );
+ }
+ return configDef;
}
/**
--
To stop receiving notification emails like this one, please contact
ewencp@apache.org.