You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/05/15 19:03:35 UTC
[kafka] branch 2.4 updated: KAFKA-9537 - Cleanup error messages for
abstract transformations (#8090)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new c7633dc KAFKA-9537 - Cleanup error messages for abstract transformations (#8090)
c7633dc is described below
commit c7633dcbe9ac18bec8857ca0b9661a03715a5fc4
Author: Jeremy Custenborder <jc...@gmail.com>
AuthorDate: Fri May 15 12:14:20 2020 -0500
KAFKA-9537 - Cleanup error messages for abstract transformations (#8090)
Added check if the transformation is abstract. If so throw an error message with guidance for the user. Ensure that the child classes are also not abstract.
Author: Jeremy Custenborder <jc...@gmail.com>
Reviewer: Randall Hauch <rh...@gmail.com>
---
.../kafka/connect/runtime/ConnectorConfig.java | 19 +++++-
.../kafka/connect/runtime/ConnectorConfigTest.java | 73 ++++++++++++++++++++++
2 files changed, 91 insertions(+), 1 deletion(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 632f5d3..b7340ae 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -29,6 +29,7 @@ import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.transforms.Transformation;
+import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -37,6 +38,8 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
@@ -329,11 +332,25 @@ public class ConnectorConfig extends AbstractConfig {
if (transformationCls == null || !Transformation.class.isAssignableFrom(transformationCls)) {
throw new ConfigException(key, String.valueOf(transformationCls), "Not a Transformation");
}
+ if (Modifier.isAbstract(transformationCls.getModifiers())) {
+ String childClassNames = Stream.of(transformationCls.getClasses())
+ .filter(transformationCls::isAssignableFrom)
+ .filter(c -> !Modifier.isAbstract(c.getModifiers()))
+ .filter(c -> Modifier.isPublic(c.getModifiers()))
+ .map(Class::getName)
+ .collect(Collectors.joining(", "));
+ String message = childClassNames.trim().isEmpty() ?
+ "Transformation is abstract and cannot be created." :
+ "Transformation is abstract and cannot be created. Did you mean " + childClassNames + "?";
+ throw new ConfigException(key, String.valueOf(transformationCls), message);
+ }
Transformation transformation;
try {
transformation = transformationCls.asSubclass(Transformation.class).newInstance();
} catch (Exception e) {
- throw new ConfigException(key, String.valueOf(transformationCls), "Error getting config definition from Transformation: " + e.getMessage());
+ ConfigException exception = new ConfigException(key, String.valueOf(transformationCls), "Error getting config definition from Transformation: " + e.getMessage());
+ exception.initCause(e);
+ throw exception;
}
ConfigDef configDef = transformation.config();
if (null == configDef) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
index fe1bf26..f674a8e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
@@ -177,4 +177,77 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
assertEquals(84, ((SimpleTransformation) transformations.get(1)).magicNumber);
}
+ @Test
+ public void abstractTransform() {
+ Map<String, String> props = new HashMap<>();
+ props.put("name", "test");
+ props.put("connector.class", TestConnector.class.getName());
+ props.put("transforms", "a");
+ props.put("transforms.a.type", AbstractTransformation.class.getName());
+ try {
+ new ConnectorConfig(MOCK_PLUGINS, props);
+ } catch (ConfigException ex) {
+ assertTrue(
+ ex.getMessage().contains("Transformation is abstract and cannot be created.")
+ );
+ }
+ }
+ @Test
+ public void abstractKeyValueTransform() {
+ Map<String, String> props = new HashMap<>();
+ props.put("name", "test");
+ props.put("connector.class", TestConnector.class.getName());
+ props.put("transforms", "a");
+ props.put("transforms.a.type", AbstractKeyValueTransformation.class.getName());
+ try {
+ new ConnectorConfig(MOCK_PLUGINS, props);
+ } catch (ConfigException ex) {
+ assertTrue(
+ ex.getMessage().contains("Transformation is abstract and cannot be created.")
+ );
+ assertTrue(
+ ex.getMessage().contains(AbstractKeyValueTransformation.Key.class.getName())
+ );
+ assertTrue(
+ ex.getMessage().contains(AbstractKeyValueTransformation.Value.class.getName())
+ );
+ }
+ }
+
+ public static abstract class AbstractTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
+
+ }
+
+ public static abstract class AbstractKeyValueTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
+ @Override
+ public R apply(R record) {
+ return null;
+ }
+
+ @Override
+ public ConfigDef config() {
+ return new ConfigDef();
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs) {
+
+ }
+
+
+ public static class Key extends AbstractKeyValueTransformation {
+
+
+ }
+ public static class Value extends AbstractKeyValueTransformation {
+
+ }
+ }
+
+
}