You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/27 06:54:15 UTC

[GitHub] [kafka] kkonstantine commented on a change in pull request #8699: KAFKA-9673: Filter and Conditional SMTs

kkonstantine commented on a change in pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#discussion_r430867370



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
##########
@@ -257,12 +274,23 @@ public boolean includeRecordDetailsInErrorLog() {
         final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
         for (String alias : transformAliases) {
             final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
+
             try {
                 @SuppressWarnings("unchecked")
-                final Transformation<R> transformation = getClass(prefix + "type").asSubclass(Transformation.class)
-                        .getDeclaredConstructor().newInstance();
-                transformation.configure(originalsWithPrefix(prefix));
-                transformations.add(transformation);
+                final Transformation<R> transformation = Utils.newInstance(getClass(prefix + "type"), Transformation.class);
+                Map<String, Object> configs = originalsWithPrefix(prefix);
+                Object predicateAlias = configs.remove(PredicatedTransformation.PREDICATE_CONFIG);
+                Object negate = configs.remove(PredicatedTransformation.NEGATE_CONFIG);
+                transformation.configure(configs);
+                if (predicateAlias != null) {
+                    String predicatePrefix = "predicates." + predicateAlias + ".";

Review comment:
       I'd suggest declaring a `PREDICATES_PREFIX` variable for higher visibility. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PredicatedTransformation.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
+
+/**
+ * Decorator for a {@link Transformation} which applies the delegate only when a
+ * {@link Predicate} is true (or false, according to {@code negate}).
+ * @param <R>
+ */
+class PredicatedTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    static final String PREDICATE_CONFIG = "predicate";
+    static final String NEGATE_CONFIG = "negate";
+    /*test*/ Predicate<R> predicate;

Review comment:
       this type of comment is not something we use elsewhere and is not immediately obvious what it means. I'd suggest removing instead. 

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
##########
@@ -214,6 +216,187 @@ public void abstractKeyValueTransform() {
         }
     }
 
+

Review comment:
       nit: 2 extra lines

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/TaskHandle.java
##########
@@ -16,15 +16,17 @@
  */
 package org.apache.kafka.connect.integration;
 
-import org.apache.kafka.connect.errors.DataException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 import java.util.stream.IntStream;
 
+import org.apache.kafka.connect.errors.DataException;

Review comment:
       nit: same comment as above

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java
##########
@@ -16,19 +16,21 @@
  */
 package org.apache.kafka.connect.integration;
 
-import org.apache.kafka.connect.errors.DataException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import org.apache.kafka.connect.errors.DataException;

Review comment:
       nit: Our current import style says that these imports stay above the java ones. That's why these show up as changes here.  

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
##########
@@ -16,21 +16,23 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import java.util.Collections;

Review comment:
       nit: same as above

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
##########
@@ -276,116 +304,251 @@ public boolean includeRecordDetailsInErrorLog() {
      * <p>
      * {@code requireFullConfig} specifies whether required config values that are missing should cause an exception to be thrown.
      */
+    @SuppressWarnings({"rawtypes", "unchecked"})
     public static ConfigDef enrich(Plugins plugins, ConfigDef baseConfigDef, Map<String, String> props, boolean requireFullConfig) {
-        Object transformAliases = ConfigDef.parseType(TRANSFORMS_CONFIG, props.get(TRANSFORMS_CONFIG), Type.LIST);
-        if (!(transformAliases instanceof List)) {
-            return baseConfigDef;
-        }
-
         ConfigDef newDef = new ConfigDef(baseConfigDef);
-        LinkedHashSet<?> uniqueTransformAliases = new LinkedHashSet<>((List<?>) transformAliases);
-        for (Object o : uniqueTransformAliases) {
-            if (!(o instanceof String)) {
-                throw new ConfigException("Item in " + TRANSFORMS_CONFIG + " property is not of "
-                        + "type String");
+        new EnrichablePlugin<Transformation<?>>("Transformation", TRANSFORMS_CONFIG, TRANSFORMS_GROUP, (Class) Transformation.class,
+                props, requireFullConfig) {
+            @SuppressWarnings("rawtypes")
+            @Override
+            protected Set<PluginDesc<Transformation<?>>> plugins() {
+                return (Set) plugins.transformations();
             }
-            String alias = (String) o;
-            final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
-            final String group = TRANSFORMS_GROUP + ": " + alias;
-            int orderInGroup = 0;
-
-            final String transformationTypeConfig = prefix + "type";
-            final ConfigDef.Validator typeValidator = new ConfigDef.Validator() {
-                @Override
-                public void ensureValid(String name, Object value) {
-                    getConfigDefFromTransformation(transformationTypeConfig, (Class) value);
-                }
-            };
-            newDef.define(transformationTypeConfig, Type.CLASS, ConfigDef.NO_DEFAULT_VALUE, typeValidator, Importance.HIGH,
-                    "Class for the '" + alias + "' transformation.", group, orderInGroup++, Width.LONG, "Transformation type for " + alias,
-                    Collections.<String>emptyList(), new TransformationClassRecommender(plugins));
 
-            final ConfigDef transformationConfigDef;
-            try {
-                final String className = props.get(transformationTypeConfig);
-                final Class<?> cls = (Class<?>) ConfigDef.parseType(transformationTypeConfig, className, Type.CLASS);
-                transformationConfigDef = getConfigDefFromTransformation(transformationTypeConfig, cls);
-            } catch (ConfigException e) {
-                if (requireFullConfig) {
-                    throw e;
-                } else {
-                    continue;
+            @Override
+            protected ConfigDef initialConfigDef() {
+                // All Transformations get these config parameters implicitly
+                return super.initialConfigDef()
+                        .define(PredicatedTransformation.PREDICATE_CONFIG, Type.STRING, "", Importance.MEDIUM,
+                                "The alias of a predicate used to determine whether to apply this transformation.")
+                        .define(PredicatedTransformation.NEGATE_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM,
+                                "Whether the configured predicate should be negated.");
+            }
+
+            @Override
+            protected Stream<Map.Entry<String, ConfigDef.ConfigKey>> configDefsForClass(String typeConfig) {
+                return super.configDefsForClass(typeConfig)
+                    .filter(entry -> {
+                        // The implicit parameters mask any from the transformer with the same name
+                        if (PredicatedTransformation.PREDICATE_CONFIG.equals(entry.getValue())
+                                || PredicatedTransformation.NEGATE_CONFIG.equals(entry.getValue())) {
+                            log.warn("Transformer config {} is masked by implicit config of that name",
+                                    entry.getValue());
+                            return false;
+                        } else {
+                            return true;
+                        }
+                    });
+            }
+
+            @Override
+            protected ConfigDef config(Transformation<?> transformation) {
+                return transformation.config();
+            }
+
+            @Override
+            protected void validateProps(String prefix) {
+                String prefixedNegate = prefix + PredicatedTransformation.NEGATE_CONFIG;
+                String prefixedPredicate = prefix + PredicatedTransformation.PREDICATE_CONFIG;
+                if (props.containsKey(prefixedNegate) &&
+                        !props.containsKey(prefixedPredicate)) {
+                    throw new ConfigException("Config '" + prefixedNegate + "' was provided " +
+                            "but there is no config '" + prefixedPredicate + "' defining a predicate to be negated.");
                 }
             }
+        }.enrich(newDef);
 
-            newDef.embed(prefix, group, orderInGroup, transformationConfigDef);
-        }
+        new EnrichablePlugin<Predicate<?>>("Predicate", PREDICATES_CONFIG, TRANSFORMS_GROUP,
+                (Class) Predicate.class, props, requireFullConfig) {
+            @Override
+            protected Set<PluginDesc<Predicate<?>>> plugins() {
+                return (Set) plugins.predicates();
+            }
 
+            @Override
+            protected ConfigDef config(Predicate<?> predicate) {
+                return predicate.config();
+            }
+        }.enrich(newDef);
         return newDef;
     }
 
     /**
-     * Return {@link ConfigDef} from {@code transformationCls}, which is expected to be a non-null {@code Class<Transformation>},
-     * by instantiating it and invoking {@link Transformation#config()}.
+     * An abstraction over "enrichable plugins" ({@link Transformation}s and {@link Predicate}s) used for computing the
+     * contribution to a Connectors ConfigDef.
+     *
+     * This is not entirely elegant because
+     * although they basically use the same "alias prefix" configuration idiom there are some differences.
+     * The abstract method pattern is used to cope with this.
+     * @param <T> The type of plugin (either {@code Transformation} or {@code Predicate}).
      */
-    static ConfigDef getConfigDefFromTransformation(String key, Class<?> transformationCls) {
-        if (transformationCls == null || !Transformation.class.isAssignableFrom(transformationCls)) {
-            throw new ConfigException(key, String.valueOf(transformationCls), "Not a Transformation");
-        }
-        if (Modifier.isAbstract(transformationCls.getModifiers())) {
-            String childClassNames = Stream.of(transformationCls.getClasses())
-                .filter(transformationCls::isAssignableFrom)
-                .filter(c -> !Modifier.isAbstract(c.getModifiers()))
-                .filter(c -> Modifier.isPublic(c.getModifiers()))
-                .map(Class::getName)
-                .collect(Collectors.joining(", "));
-            String message = childClassNames.trim().isEmpty() ?
-                "Transformation is abstract and cannot be created." :
-                "Transformation is abstract and cannot be created. Did you mean " + childClassNames + "?";
-            throw new ConfigException(key, String.valueOf(transformationCls), message);
+    static abstract class EnrichablePlugin<T> {
+
+        private final String aliasKind;
+        private final String aliasConfig;
+        private final String aliasGroup;
+        private final Class<T> baseClass;
+        private final Map<String, String> props;
+        private final boolean requireFullConfig;
+
+        public EnrichablePlugin(
+                String aliasKind,
+                String aliasConfig, String aliasGroup, Class<T> baseClass,
+                Map<String, String> props, boolean requireFullConfig) {
+            this.aliasKind = aliasKind;
+            this.aliasConfig = aliasConfig;
+            this.aliasGroup = aliasGroup;
+            this.baseClass = baseClass;
+            this.props = props;
+            this.requireFullConfig = requireFullConfig;
         }
-        Transformation transformation;
-        try {
-            transformation = transformationCls.asSubclass(Transformation.class).getConstructor().newInstance();
-        } catch (Exception e) {
-            ConfigException exception = new ConfigException(key, String.valueOf(transformationCls), "Error getting config definition from Transformation: " + e.getMessage());
-            exception.initCause(e);
-            throw exception;
+
+        /** Add the configs for this alias to the given {@code ConfigDef}. */
+        void enrich(ConfigDef newDef) {
+            Object aliases = ConfigDef.parseType(aliasConfig, props.get(aliasConfig), Type.LIST);
+            if (!(aliases instanceof List)) {
+                return;
+            }
+
+            LinkedHashSet<?> uniqueAliases = new LinkedHashSet<>((List<?>) aliases);
+            for (Object o : uniqueAliases) {
+                if (!(o instanceof String)) {
+                    throw new ConfigException("Item in " + aliasConfig + " property is not of "
+                            + "type String");
+                }
+                String alias = (String) o;
+                final String prefix = aliasConfig + "." + alias + ".";
+                final String group = aliasGroup + ": " + alias;
+                int orderInGroup = 0;
+
+                final String typeConfig = prefix + "type";
+                final ConfigDef.Validator typeValidator = new ConfigDef.Validator() {

Review comment:
       Consider using `LambdaValidator` to be able to add a lambda for the `toString` method of this validator.  

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/TaskHandle.java
##########
@@ -37,22 +39,26 @@
     private final ConnectorHandle connectorHandle;
     private final AtomicInteger partitionsAssigned = new AtomicInteger(0);
     private final StartAndStopCounter startAndStopCounter = new StartAndStopCounter();
+    private final Consumer<SinkRecord> consumer;
 
     private CountDownLatch recordsRemainingLatch;
     private CountDownLatch recordsToCommitLatch;
     private int expectedRecords = -1;
     private int expectedCommits = -1;
 
-    public TaskHandle(ConnectorHandle connectorHandle, String taskId) {
-        log.info("Created task {} for connector {}", taskId, connectorHandle);
+    public TaskHandle(ConnectorHandle connectorHandle, String taskId, Consumer<SinkRecord> consumer) {
         this.taskId = taskId;
         this.connectorHandle = connectorHandle;
+        this.consumer = consumer;
     }
 
     /**
      * Record a message arrival at the task and the connector overall.
      */
-    public void record() {

Review comment:
       This might break existing tests that depend on Connect's integration tests framework. Probably good idea to keep it, in which case the consumer should be ignored. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org