You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/05/15 19:03:35 UTC

[kafka] branch 2.4 updated: KAFKA-9537 - Cleanup error messages for abstract transformations (#8090)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new c7633dc  KAFKA-9537 - Cleanup error messages for abstract transformations (#8090)
c7633dc is described below

commit c7633dcbe9ac18bec8857ca0b9661a03715a5fc4
Author: Jeremy Custenborder <jc...@gmail.com>
AuthorDate: Fri May 15 12:14:20 2020 -0500

    KAFKA-9537 - Cleanup error messages for abstract transformations (#8090)
    
    Added check if the transformation is abstract. If so throw an error message with guidance for the user. Ensure that the child classes are also not abstract.
    
    Author: Jeremy Custenborder <jc...@gmail.com>
    Reviewer: Randall Hauch <rh...@gmail.com>
---
 .../kafka/connect/runtime/ConnectorConfig.java     | 19 +++++-
 .../kafka/connect/runtime/ConnectorConfigTest.java | 73 ++++++++++++++++++++++
 2 files changed, 91 insertions(+), 1 deletion(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 632f5d3..b7340ae 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -29,6 +29,7 @@ import org.apache.kafka.connect.runtime.isolation.PluginDesc;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.transforms.Transformation;
 
+import java.lang.reflect.Modifier;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -37,6 +38,8 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
@@ -329,11 +332,25 @@ public class ConnectorConfig extends AbstractConfig {
         if (transformationCls == null || !Transformation.class.isAssignableFrom(transformationCls)) {
             throw new ConfigException(key, String.valueOf(transformationCls), "Not a Transformation");
         }
+        if (Modifier.isAbstract(transformationCls.getModifiers())) {
+            String childClassNames = Stream.of(transformationCls.getClasses())
+                .filter(transformationCls::isAssignableFrom)
+                .filter(c -> !Modifier.isAbstract(c.getModifiers()))
+                .filter(c -> Modifier.isPublic(c.getModifiers()))
+                .map(Class::getName)
+                .collect(Collectors.joining(", "));
+            String message = childClassNames.trim().isEmpty() ?
+                "Transformation is abstract and cannot be created." :
+                "Transformation is abstract and cannot be created. Did you mean " + childClassNames + "?";
+            throw new ConfigException(key, String.valueOf(transformationCls), message);
+        }
         Transformation transformation;
         try {
             transformation = transformationCls.asSubclass(Transformation.class).newInstance();
         } catch (Exception e) {
-            throw new ConfigException(key, String.valueOf(transformationCls), "Error getting config definition from Transformation: " + e.getMessage());
+            ConfigException exception = new ConfigException(key, String.valueOf(transformationCls), "Error getting config definition from Transformation: " + e.getMessage());
+            exception.initCause(e);
+            throw exception;
         }
         ConfigDef configDef = transformation.config();
         if (null == configDef) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
index fe1bf26..f674a8e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
@@ -177,4 +177,77 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
         assertEquals(84, ((SimpleTransformation) transformations.get(1)).magicNumber);
     }
 
+    @Test
+    public void abstractTransform() {
+        Map<String, String> props = new HashMap<>();
+        props.put("name", "test");
+        props.put("connector.class", TestConnector.class.getName());
+        props.put("transforms", "a");
+        props.put("transforms.a.type", AbstractTransformation.class.getName());
+        try {
+            new ConnectorConfig(MOCK_PLUGINS, props);
+        } catch (ConfigException ex) {
+            assertTrue(
+                ex.getMessage().contains("Transformation is abstract and cannot be created.")
+            );
+        }
+    }
+    @Test
+    public void abstractKeyValueTransform() {
+        Map<String, String> props = new HashMap<>();
+        props.put("name", "test");
+        props.put("connector.class", TestConnector.class.getName());
+        props.put("transforms", "a");
+        props.put("transforms.a.type", AbstractKeyValueTransformation.class.getName());
+        try {
+            new ConnectorConfig(MOCK_PLUGINS, props);
+        } catch (ConfigException ex) {
+            assertTrue(
+                ex.getMessage().contains("Transformation is abstract and cannot be created.")
+            );
+            assertTrue(
+                ex.getMessage().contains(AbstractKeyValueTransformation.Key.class.getName())
+            );
+            assertTrue(
+                ex.getMessage().contains(AbstractKeyValueTransformation.Value.class.getName())
+            );
+        }
+    }
+
+    public static abstract class AbstractTransformation<R extends ConnectRecord<R>> implements Transformation<R>  {
+
+    }
+
+    public static abstract class AbstractKeyValueTransformation<R extends ConnectRecord<R>> implements Transformation<R>  {
+        @Override
+        public R apply(R record) {
+            return null;
+        }
+
+        @Override
+        public ConfigDef config() {
+            return new ConfigDef();
+        }
+
+        @Override
+        public void close() {
+
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+
+        }
+
+
+        public static class Key extends AbstractKeyValueTransformation {
+
+
+        }
+        public static class Value extends AbstractKeyValueTransformation {
+
+        }
+    }
+
+
 }