You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/05/22 20:56:02 UTC

[kafka] branch trunk updated: KAFKA-5807 - Check Connector.config() and Transformation.config() returns a valid ConfigDef

This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7ecfbab  KAFKA-5807 - Check Connector.config() and Transformation.config() returns a valid ConfigDef
7ecfbab is described below

commit 7ecfbab92fa0993a3046ff0c3d4fc69f319f13a3
Author: Jeremy Custenborder <jc...@gmail.com>
AuthorDate: Tue May 22 13:53:15 2018 -0700

    KAFKA-5807 - Check Connector.config() and Transformation.config() returns a valid ConfigDef
    
    Little back story on this. Was helping a user over email. This could be much easier to debug if we assume that the connector developer might not return valid configs. For example Intellij will generate a stub that returns a null. This was the case that inspired this JIRA.
    
    Author: Jeremy Custenborder <jc...@gmail.com>
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, Randall Hauch <rh...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
    
    Closes #3762 from jcustenborder/KAFKA-5807
---
 .../org/apache/kafka/connect/connector/Connector.java    |  8 +++++++-
 .../org/apache/kafka/connect/runtime/AbstractHerder.java | 16 ++++++++++++++++
 .../apache/kafka/connect/runtime/ConnectorConfig.java    | 13 ++++++++++++-
 3 files changed, 35 insertions(+), 2 deletions(-)

diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
index a8a5dab..30dfd3c 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.connector;
 import org.apache.kafka.common.config.Config;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigValue;
+import org.apache.kafka.connect.errors.ConnectException;
 
 import java.util.List;
 import java.util.Map;
@@ -130,13 +131,18 @@ public abstract class Connector {
      */
     public Config validate(Map<String, String> connectorConfigs) {
         ConfigDef configDef = config();
+        if (null == configDef) {
+            throw new ConnectException(
+                String.format("%s.config() must return a ConfigDef that is not null.", this.getClass().getName())
+            );
+        }
         List<ConfigValue> configValues = configDef.validate(connectorConfigs);
         return new Config(configValues);
     }
 
     /**
      * Define the configuration for the connector.
-     * @return The ConfigDef for this connector.
+     * @return The ConfigDef for this connector; may not be null.
      */
     public abstract ConfigDef config();
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 424b474..c315686 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -271,7 +271,23 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
 
             // do custom connector-specific validation
             Config config = connector.validate(connectorProps);
+            if (null == config) {
+                throw new BadRequestException(
+                    String.format(
+                        "%s.validate() must return a Config that is not null.",
+                        connector.getClass().getName()
+                    )
+                );
+            }
             ConfigDef configDef = connector.config();
+            if (null == configDef) {
+                throw new BadRequestException(
+                    String.format(
+                        "%s.config() must return a ConfigDef that is not null.",
+                        connector.getClass().getName()
+                    )
+                );
+            }
             configKeys.putAll(configDef.configKeys());
             allGroups.addAll(configDef.groups());
             configValues.addAll(config.configValues());
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 6a90310..a8dd49a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -227,11 +227,22 @@ public class ConnectorConfig extends AbstractConfig {
         if (transformationCls == null || !Transformation.class.isAssignableFrom(transformationCls)) {
             throw new ConfigException(key, String.valueOf(transformationCls), "Not a Transformation");
         }
+        Transformation transformation;
         try {
-            return (transformationCls.asSubclass(Transformation.class).newInstance()).config();
+            transformation = transformationCls.asSubclass(Transformation.class).newInstance();
         } catch (Exception e) {
             throw new ConfigException(key, String.valueOf(transformationCls), "Error getting config definition from Transformation: " + e.getMessage());
         }
+        ConfigDef configDef = transformation.config();
+        if (null == configDef) {
+            throw new ConnectException(
+                String.format(
+                    "%s.config() must return a ConfigDef that is not null.",
+                    transformationCls.getName()
+                )
+            );
+        }
+        return configDef;
     }
 
     /**

-- 
To stop receiving notification emails like this one, please contact
ewencp@apache.org.