You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/20 18:34:15 UTC

[GitHub] [kafka] tombentley opened a new pull request #8699: KAFKA-9673: Filter and Conditional SMTs

tombentley opened a new pull request #8699:
URL: https://github.com/apache/kafka/pull/8699


   * Add Predicate interface
   * Add Filter SMT
   * Add the predicate implementations defined in the KIP.
   * Create abstraction in ConnectorConfig for configuring Transformations and Connectors with the "alias prefix" mechanism
   * Add tests and fix existing tests.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#discussion_r429067483



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
##########
@@ -257,12 +273,25 @@ public boolean includeRecordDetailsInErrorLog() {
         final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
         for (String alias : transformAliases) {
             final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
+
             try {
                 @SuppressWarnings("unchecked")
                 final Transformation<R> transformation = getClass(prefix + "type").asSubclass(Transformation.class)
                         .getDeclaredConstructor().newInstance();
-                transformation.configure(originalsWithPrefix(prefix));
-                transformations.add(transformation);
+                Map<String, Object> configs = originalsWithPrefix(prefix);
+                Object predicateAlias = configs.remove("predicate");
+                Object negate = configs.remove("negate");
+                transformation.configure(configs);
+                if (predicateAlias != null) {
+                    String predicatePrefix = "predicates." + predicateAlias + ".";
+                    @SuppressWarnings("unchecked")
+                    Predicate<R> predicate = getClass(predicatePrefix + "type").asSubclass(Predicate.class)
+                            .getDeclaredConstructor().newInstance();
+                    predicate.configure(originalsWithPrefix(predicatePrefix));
+                    transformations.add(new PredicatedTransformation<>(predicate, negate == null ? false : Boolean.parseBoolean(negate.toString()), transformation));

Review comment:
       Actually, maybe I can make this nicer by changing how ConfigDef instantiates the `PredicateTransformer`, let me see...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#discussion_r428333743



##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+
+/**
+ * A predicate which is true for records which are tombstones (i.e. have null key).
+ * @param <R> The type of connect record.
+ */
+public class RecordIsTombstone<R extends ConnectRecord<R>> implements Predicate<R> {
+    @Override
+    public ConfigDef config() {
+        return new ConfigDef();

Review comment:
       Probably won't impact performance too much but we could technically use a single `ConfigDef` instance for the entire class instead of creating a new one every time this method is called.

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+
+/**
+ * A predicate which is true for records which are tombstones (i.e. have null key).
+ * @param <R> The type of connect record.
+ */
+public class RecordIsTombstone<R extends ConnectRecord<R>> implements Predicate<R> {
+    @Override
+    public ConfigDef config() {
+        return new ConfigDef();
+    }
+
+    @Override
+    public boolean test(R record) {
+        return record.key() == null;

Review comment:
       I think we want to check the value instead of the key here?
   ```suggestion
           return record.value() == null;
   ```

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+
+/**
+ * A predicate which is true for records which are tombstones (i.e. have null key).

Review comment:
       I think a tombstone is defined as a record with a null value, not a null key:
   ```suggestion
    * A predicate which is true for records which are tombstones (i.e. have null values).
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PredicatedTransformation.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
+
+/**
+ * Decorator for a {@link Transformation} which applies the delegate only when a
+ * {@link Predicate} is true (or false, according to {@code negate}).
+ * @param <R>
+ */
+class PredicatedTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    /*test*/ final Predicate<R> predicate;
+    /*test*/ final Transformation<R> delegate;
+    /*test*/ final boolean negate;
+
+    PredicatedTransformation(Predicate<R> predicate, boolean negate, Transformation<R> delegate) {
+        this.predicate = predicate;
+        this.negate = negate;
+        this.delegate = delegate;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+
+    }
+
+    @Override
+    public R apply(R record) {
+        if (negate ^ predicate.test(record)) {
+            return delegate.apply(record);
+        }
+        return record;
+    }
+
+    @Override
+    public ConfigDef config() {
+        return null;

Review comment:
       Same comment here as with `Filter`; probably want to return a non-null `ConfigDef` here.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
##########
@@ -257,12 +273,25 @@ public boolean includeRecordDetailsInErrorLog() {
         final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
         for (String alias : transformAliases) {
             final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
+
             try {
                 @SuppressWarnings("unchecked")
                 final Transformation<R> transformation = getClass(prefix + "type").asSubclass(Transformation.class)
                         .getDeclaredConstructor().newInstance();
-                transformation.configure(originalsWithPrefix(prefix));
-                transformations.add(transformation);
+                Map<String, Object> configs = originalsWithPrefix(prefix);
+                Object predicateAlias = configs.remove("predicate");
+                Object negate = configs.remove("negate");

Review comment:
       Do we need to remove these properties here, or can we just read them? Removing might cause issues with SMTs that have config properties with these names; would leaving them in be likely to cause issues as well?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
##########
@@ -257,12 +273,25 @@ public boolean includeRecordDetailsInErrorLog() {
         final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
         for (String alias : transformAliases) {
             final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
+
             try {
                 @SuppressWarnings("unchecked")
                 final Transformation<R> transformation = getClass(prefix + "type").asSubclass(Transformation.class)
                         .getDeclaredConstructor().newInstance();
-                transformation.configure(originalsWithPrefix(prefix));
-                transformations.add(transformation);
+                Map<String, Object> configs = originalsWithPrefix(prefix);
+                Object predicateAlias = configs.remove("predicate");
+                Object negate = configs.remove("negate");
+                transformation.configure(configs);
+                if (predicateAlias != null) {
+                    String predicatePrefix = "predicates." + predicateAlias + ".";
+                    @SuppressWarnings("unchecked")
+                    Predicate<R> predicate = getClass(predicatePrefix + "type").asSubclass(Predicate.class)
+                            .getDeclaredConstructor().newInstance();
+                    predicate.configure(originalsWithPrefix(predicatePrefix));
+                    transformations.add(new PredicatedTransformation<>(predicate, negate == null ? false : Boolean.parseBoolean(negate.toString()), transformation));

Review comment:
       Just curious, why directly parse the `negate` property here instead of doing that in `PredicatedTransformation::configure`?

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+
+/**
+ * A predicate which is true for records with at least one header with the configured name.
+ * @param <R> The type of connect record.
+ */
+public class HasHeaderKey<R extends ConnectRecord<R>> implements Predicate<R> {
+
+    private static final String NAME_CONFIG_KEY = "name";
+    private String name;
+
+    @Override
+    public ConfigDef config() {
+        return new ConfigDef().define(NAME_CONFIG_KEY, ConfigDef.Type.STRING, null,
+                new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM,
+                "The header name.");
+    }
+
+    @Override
+    public boolean test(R record) {
+        return record.headers().allWithName(name).hasNext();

Review comment:
       Might want to break this up to avoid an NPE since [Headers::allWithName](https://github.com/apache/kafka/blob/67770072da1bd13762af978faaa278c4039167a2/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java#L52) is technically allowed to return `null` in some situations:
   ```suggestion
           Iterator<Header> headersWithName = record.headers().allWithName(name);
           return headersWithName != null ? headersWithName.hasNext() : false;
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
##########
@@ -257,12 +273,25 @@ public boolean includeRecordDetailsInErrorLog() {
         final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
         for (String alias : transformAliases) {
             final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
+
             try {
                 @SuppressWarnings("unchecked")
                 final Transformation<R> transformation = getClass(prefix + "type").asSubclass(Transformation.class)
                         .getDeclaredConstructor().newInstance();
-                transformation.configure(originalsWithPrefix(prefix));
-                transformations.add(transformation);
+                Map<String, Object> configs = originalsWithPrefix(prefix);
+                Object predicateAlias = configs.remove("predicate");
+                Object negate = configs.remove("negate");
+                transformation.configure(configs);
+                if (predicateAlias != null) {
+                    String predicatePrefix = "predicates." + predicateAlias + ".";
+                    @SuppressWarnings("unchecked")
+                    Predicate<R> predicate = getClass(predicatePrefix + "type").asSubclass(Predicate.class)
+                            .getDeclaredConstructor().newInstance();

Review comment:
       Blegh, was hoping we might be able to use `Utils::newInstance` or `AbstractConfig::newConfiguredInstance` but it looks like neither quite does what we need; the former doesn't do casting to a subclass unless you give it the FQCN of a class instead of an already-loaded `Class<?>` object, and the latter doesn't give enough control over exactly which properties the new instance is configured with.
   
   Since we're using this same logic in several different places in this file alone, we might consider expanding the `Utils` class with a new utility method that does this for us. Maybe something like:
   
   ```java
   public static Class<T> newInstance(Class<?> klass, Class<T> baseClass) {
       // Return an instance of klass that has been automatically cast to the type of baseClass
   }
   ```

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+
+/**
+ * A predicate which is true for records with at least one header with the configured name.
+ * @param <R> The type of connect record.
+ */
+public class HasHeaderKey<R extends ConnectRecord<R>> implements Predicate<R> {
+
+    private static final String NAME_CONFIG_KEY = "name";
+    private String name;
+
+    @Override
+    public ConfigDef config() {
+        return new ConfigDef().define(NAME_CONFIG_KEY, ConfigDef.Type.STRING, null,
+                new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM,
+                "The header name.");
+    }
+
+    @Override
+    public boolean test(R record) {
+        return record.headers().allWithName(name).hasNext();
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        this.name = (String) configs.get(NAME_CONFIG_KEY);

Review comment:
       Might consider using a [SimpleConfig](https://github.com/apache/kafka/blob/67770072da1bd13762af978faaa278c4039167a2/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/SimpleConfig.java) here. Won't make a huge difference with the class as-is, but will make it easier to make changes in the future if we ever want to expand on the configurability of this predicate.

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Filter.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+
+/**
+ * Drops all records, filtering them from subsequent transformations in the chain.
+ * This is intended to be used conditionally to filter out records matching (or not matching)
+ * a particular {@link org.apache.kafka.connect.transforms.predicates.Predicate}.
+ * @param <R> The type of record.
+ */
+public class Filter<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    @Override
+    public R apply(R record) {
+        return null;
+    }
+
+    @Override
+    public ConfigDef config() {
+        return null;

Review comment:
       I think this will fail during validation since [transformations must provide non-null ConfigDefs](https://github.com/apache/kafka/blob/67770072da1bd13762af978faaa278c4039167a2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L355-L363). Might want to instantiate a single empty static `ConfigDef` object for the class and just return that?

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+
+/**
+ * A predicate which is true for records with a topic name that matches the configured regular expression.
+ * @param <R> The type of connect record.
+ */
+public class TopicNameMatches<R extends ConnectRecord<R>> implements Predicate<R> {
+
+    public static final String PATTERN_CONFIG_KEY = "pattern";
+    private Pattern pattern;
+
+    @Override
+    public ConfigDef config() {
+        return new ConfigDef().define(PATTERN_CONFIG_KEY, ConfigDef.Type.STRING, null,
+                new ConfigDef.Validator() {
+                    @Override
+                    public void ensureValid(String name, Object value) {
+                        if (value != null) {
+                            compile(name, value);
+                        }
+                    }
+                }, ConfigDef.Importance.MEDIUM,
+                "A Java regular expression for matching against the name of a record's topic.");
+    }
+
+    private Pattern compile(String name, Object value) {
+        try {
+            return Pattern.compile((String) value);
+        } catch (PatternSyntaxException e) {
+            throw new ConfigException(name, value, "entry must be a Java-compatible regular expression: " + e.getMessage());
+        }
+    }
+
+    @Override
+    public boolean test(R record) {
+        return record.topic() != null && pattern.matcher(record.topic()).matches();
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        this.pattern = compile(PATTERN_CONFIG_KEY, configs.get(PATTERN_CONFIG_KEY));

Review comment:
       Same comment as elsewhere: might want to use a `SimpleConfig` in this class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#issuecomment-634766420


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#discussion_r431281159



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.transforms.Filter;
+import org.apache.kafka.connect.transforms.predicates.HasHeaderKey;
+import org.apache.kafka.connect.transforms.predicates.RecordIsTombstone;
+import org.apache.kafka.connect.transforms.predicates.TopicNameMatches;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.PREDICATES_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TRANSFORMS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * An integration test for connectors with transformations
+ */
+@Category(IntegrationTest.class)
+public class TransformationIntegrationTest {
+
+    private static final int NUM_RECORDS_PRODUCED = 2000;
+    private static final int NUM_TOPIC_PARTITIONS = 3;
+    private static final long RECORD_TRANSFER_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
+    private static final long OBSERVED_RECORDS_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
+    private static final int NUM_TASKS = 3;
+    private static final int NUM_WORKERS = 3;
+    private static final String CONNECTOR_NAME = "simple-conn";
+    private static final String SINK_CONNECTOR_CLASS_NAME = MonitorableSinkConnector.class.getSimpleName();
+    private static final String SOURCE_CONNECTOR_CLASS_NAME = MonitorableSourceConnector.class.getSimpleName();
+
+    private EmbeddedConnectCluster connect;
+    private ConnectorHandle connectorHandle;
+
+    @Before
+    public void setup() {
+        // setup Connect worker properties
+        Map<String, String> exampleWorkerProps = new HashMap<>();
+        exampleWorkerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000));
+
+        // setup Kafka broker properties
+        Properties exampleBrokerProps = new Properties();
+        exampleBrokerProps.put("auto.create.topics.enable", "false");
+
+        // build a Connect cluster backed by Kafka and Zk
+        connect = new EmbeddedConnectCluster.Builder()
+                .name("connect-cluster")
+                .numWorkers(NUM_WORKERS)
+                .numBrokers(1)
+                .workerProps(exampleWorkerProps)
+                .brokerProps(exampleBrokerProps)
+                .build();
+
+        // start the clusters
+        connect.start();

Review comment:
       Should we wait until all brokers and Connect workers are available, via something like:
   ```
           connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Brokers did not start in time.");
           connect.assertions().assertExactlyNumWorkersAreUp(numWorkers, "Worker did not start in time.");
   ```

##########
File path: connect/api/src/main/java/org/apache/kafka/connect/transforms/predicates/Predicate.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+
+/**
+ * <p>A predicate on records.
+ * Predicates can be used to conditionally apply a {@link org.apache.kafka.connect.transforms.Transformation}
+ * by configuring the transformation's {@code predicate} (and {@code negate}) configuration parameters.
+ * In particular, the {@code Filter} transformation can be conditionally applied in order to filter
+ * certain records from further processing.
+ *
+ * <p>Implementations of this interface must be public and have a public constructor with no parameters.
+ *
+ * @param <R> The type of record.
+ */
+public interface Predicate<R extends ConnectRecord<R>> extends Configurable, AutoCloseable {
+
+    /**
+     * Configuration specification for this predicate.
+     */
+    ConfigDef config();
+
+    /**
+     * Returns whether the given record satisfies this predicate.
+     */

Review comment:
       ```suggestion
       /**
        * Returns whether the given record satisfies this predicate.
        *
        * @param record the record to evaluate; may not be null
        * @return true if the predicate matches, or false otherwise
        */
   ```

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+/**
+ * A predicate which is true for records with at least one header with the configured name.
+ * @param <R> The type of connect record.
+ */
+public class HasHeaderKey<R extends ConnectRecord<R>> implements Predicate<R> {
+
+    private static final String NAME_CONFIG = "name";
+    private static final ConfigDef CONFIG_DEF = new ConfigDef().define(NAME_CONFIG, ConfigDef.Type.STRING, null,

Review comment:
       Here the default is null, which means that the configuration validation allows the `name` field to not be set. Per the KIP, we want to require that `name` is set. To do that, we should use `ConfigDef.NO_DEFAULT_VALUE`:
   ```suggestion
       private static final ConfigDef CONFIG_DEF = new ConfigDef().define(NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
   ```

##########
File path: connect/api/src/main/java/org/apache/kafka/connect/transforms/predicates/Predicate.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+
+/**
+ * <p>A predicate on records.
+ * Predicates can be used to conditionally apply a {@link org.apache.kafka.connect.transforms.Transformation}
+ * by configuring the transformation's {@code predicate} (and {@code negate}) configuration parameters.
+ * In particular, the {@code Filter} transformation can be conditionally applied in order to filter
+ * certain records from further processing.
+ *
+ * <p>Implementations of this interface must be public and have a public constructor with no parameters.
+ *
+ * @param <R> The type of record.
+ */
+public interface Predicate<R extends ConnectRecord<R>> extends Configurable, AutoCloseable {
+
+    /**
+     * Configuration specification for this predicate.
+     */

Review comment:
       ```suggestion
       /**
        * Configuration specification for this predicate.
        *
        * @return the configuration definition for this predicate; never null
        */
   ```

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.transforms.Filter;
+import org.apache.kafka.connect.transforms.predicates.HasHeaderKey;
+import org.apache.kafka.connect.transforms.predicates.RecordIsTombstone;
+import org.apache.kafka.connect.transforms.predicates.TopicNameMatches;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.PREDICATES_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TRANSFORMS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * An integration test for connectors with transformations
+ */
+@Category(IntegrationTest.class)
+public class TransformationIntegrationTest {
+
+    private static final int NUM_RECORDS_PRODUCED = 2000;
+    private static final int NUM_TOPIC_PARTITIONS = 3;
+    private static final long RECORD_TRANSFER_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
+    private static final long OBSERVED_RECORDS_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
+    private static final int NUM_TASKS = 3;
+    private static final int NUM_WORKERS = 3;
+    private static final String CONNECTOR_NAME = "simple-conn";
+    private static final String SINK_CONNECTOR_CLASS_NAME = MonitorableSinkConnector.class.getSimpleName();
+    private static final String SOURCE_CONNECTOR_CLASS_NAME = MonitorableSourceConnector.class.getSimpleName();
+
+    private EmbeddedConnectCluster connect;
+    private ConnectorHandle connectorHandle;
+
+    @Before
+    public void setup() {
+        // setup Connect worker properties
+        Map<String, String> exampleWorkerProps = new HashMap<>();
+        exampleWorkerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000));
+
+        // setup Kafka broker properties
+        Properties exampleBrokerProps = new Properties();
+        exampleBrokerProps.put("auto.create.topics.enable", "false");
+
+        // build a Connect cluster backed by Kafka and Zk
+        connect = new EmbeddedConnectCluster.Builder()
+                .name("connect-cluster")
+                .numWorkers(NUM_WORKERS)
+                .numBrokers(1)
+                .workerProps(exampleWorkerProps)
+                .brokerProps(exampleBrokerProps)
+                .build();
+
+        // start the clusters
+        connect.start();
+
+        // get a handle to the connector
+        connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
+    }
+
+    @After
+    public void close() {
+        // delete connector handle
+        RuntimeHandles.get().deleteConnector(CONNECTOR_NAME);
+
+        // stop all Connect, Kafka and Zk threads.
+        connect.stop();
+    }
+
+    /**
+     * Test the {@link Filter} transformer with a
+     * {@link TopicNameMatches} predicate on a sink connector.
+     */
+    @Test
+    public void testFilterOnTopicNameWithSinkConnector() throws Exception {
+        Map<String, Long> observedRecords = observeRecords();
+
+        // create test topics
+        String fooTopic = "foo-topic";
+        String barTopic = "bar-topic";
+        int numFooRecords = NUM_RECORDS_PRODUCED;
+        int numBarRecords = NUM_RECORDS_PRODUCED;
+        connect.kafka().createTopic(fooTopic, NUM_TOPIC_PARTITIONS);
+        connect.kafka().createTopic(barTopic, NUM_TOPIC_PARTITIONS);
+
+        // setup up props for the sink connector
+        Map<String, String> props = new HashMap<>();
+        props.put("name", CONNECTOR_NAME);
+        props.put(CONNECTOR_CLASS_CONFIG, SINK_CONNECTOR_CLASS_NAME);
+        props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+        props.put(TOPICS_CONFIG, String.join(",", fooTopic, barTopic));
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(TRANSFORMS_CONFIG, "filter");
+        props.put(TRANSFORMS_CONFIG + ".filter.type", Filter.class.getSimpleName());
+        props.put(TRANSFORMS_CONFIG + ".filter.predicate", "barPredicate");
+        props.put(PREDICATES_CONFIG, "barPredicate");
+        props.put(PREDICATES_CONFIG + ".barPredicate.type", TopicNameMatches.class.getSimpleName());
+        props.put(PREDICATES_CONFIG + ".barPredicate.pattern", "bar-.*");
+
+        // expect all records to be consumed by the connector
+        connectorHandle.expectedRecords(numFooRecords);
+
+        // expect all records to be consumed by the connector
+        connectorHandle.expectedCommits(numFooRecords);
+
+        // start a sink connector
+        connect.configureConnector(CONNECTOR_NAME, props);

Review comment:
       This is an asynchronous method, and it's likely the connector will not be started and running before the test proceeds to the next statements. This can lead to very flaky tests.
   
   We could instead wait until the connector is actually running, using something like:
   ```
           connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
                   "Connector tasks did not start in time.");
   ```

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.util.RegexValidator;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+/**
+ * A predicate which is true for records with a topic name that matches the configured regular expression.
+ * @param <R> The type of connect record.
+ */
+public class TopicNameMatches<R extends ConnectRecord<R>> implements Predicate<R> {
+
+    private static final String PATTERN_CONFIG = "pattern";
+    private static final ConfigDef CONFIG_DEF = new ConfigDef().define(PATTERN_CONFIG, ConfigDef.Type.STRING, ".*",
+            new RegexValidator(), ConfigDef.Importance.MEDIUM,
+            "A Java regular expression for matching against the name of a record's topic.");

Review comment:
       The default doesn't match the KIP. Either we should update the KIP to accept `.*` as the default pattern, or we should use `ConfigDef.NO_DEFAULT_VALUE` as the default to require the `pattern` to be set.
   

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.util.RegexValidator;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+/**
+ * A predicate which is true for records with a topic name that matches the configured regular expression.
+ * @param <R> The type of connect record.
+ */
+public class TopicNameMatches<R extends ConnectRecord<R>> implements Predicate<R> {
+
+    private static final String PATTERN_CONFIG = "pattern";
+    private static final ConfigDef CONFIG_DEF = new ConfigDef().define(PATTERN_CONFIG, ConfigDef.Type.STRING, ".*",
+            new RegexValidator(), ConfigDef.Importance.MEDIUM,
+            "A Java regular expression for matching against the name of a record's topic.");
+    private Pattern pattern;
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public boolean test(R record) {
+        return record.topic() != null && pattern.matcher(record.topic()).matches();
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        SimpleConfig simpleConfig = new SimpleConfig(config(), configs);
+        Pattern result;
+        String value = simpleConfig.getString(PATTERN_CONFIG);
+        try {
+            result = Pattern.compile(value);
+        } catch (PatternSyntaxException e) {
+            throw new ConfigException(PATTERN_CONFIG, value, "entry must be a Java-compatible regular expression: " + e.getMessage());
+        }

Review comment:
       Can we ever get to line 64? The constructor of the config (line 58) should fail if the `pattern` validator fails to ensure the pattern is a valid regex, which means that if we make it past 58 then line 62 will never fail.
   
   Am I missing something?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#discussion_r429072047



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
##########
@@ -257,12 +273,25 @@ public boolean includeRecordDetailsInErrorLog() {
         final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
         for (String alias : transformAliases) {
             final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
+
             try {
                 @SuppressWarnings("unchecked")
                 final Transformation<R> transformation = getClass(prefix + "type").asSubclass(Transformation.class)
                         .getDeclaredConstructor().newInstance();
-                transformation.configure(originalsWithPrefix(prefix));
-                transformations.add(transformation);
+                Map<String, Object> configs = originalsWithPrefix(prefix);
+                Object predicateAlias = configs.remove("predicate");
+                Object negate = configs.remove("negate");
+                transformation.configure(configs);
+                if (predicateAlias != null) {
+                    String predicatePrefix = "predicates." + predicateAlias + ".";
+                    @SuppressWarnings("unchecked")
+                    Predicate<R> predicate = getClass(predicatePrefix + "type").asSubclass(Predicate.class)
+                            .getDeclaredConstructor().newInstance();
+                    predicate.configure(originalsWithPrefix(predicatePrefix));
+                    transformations.add(new PredicatedTransformation<>(predicate, negate == null ? false : Boolean.parseBoolean(negate.toString()), transformation));

Review comment:
       My idea was to let the `PredicatedTransformer` instantiate its delegate. That could be done, I think but `PredicatedTransformer` cannot instantiate its `Predicate` because the configs for that are under the `predicates.<key>` prefix rather than `transformers.<key>`, so there's not a single `Map` which you could pass to `PredicatedTransformer.configure()` to let it instantiate both transformer and predicate. We could instantiate the predicate in `ConnectorConfig`, but pass a `Map` and let the `PredicatedTransformer` instantiate the `Transformer`, but then transformer instantiation happens in different places depending on whether it's its predicated or not, so it doesn't seem worth it. I'm back to "if you prefer the consistency of using `configure()` I'm happy to do it.", so just let me know.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#issuecomment-635143830


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#discussion_r431305276



##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+/**
+ * A predicate which is true for records with at least one header with the configured name.
+ * @param <R> The type of connect record.
+ */
+public class HasHeaderKey<R extends ConnectRecord<R>> implements Predicate<R> {
+
+    private static final String NAME_CONFIG = "name";
+    private static final ConfigDef CONFIG_DEF = new ConfigDef().define(NAME_CONFIG, ConfigDef.Type.STRING, null,

Review comment:
       Ah, thanks, I'd not realised that was the point of `ConfigDef.NO_DEFAULT_VALUE`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch merged pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
rhauch merged pull request #8699:
URL: https://github.com/apache/kafka/pull/8699


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#issuecomment-634846871


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#discussion_r431308724



##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.util.RegexValidator;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+/**
+ * A predicate which is true for records with a topic name that matches the configured regular expression.
+ * @param <R> The type of connect record.
+ */
+public class TopicNameMatches<R extends ConnectRecord<R>> implements Predicate<R> {
+
+    private static final String PATTERN_CONFIG = "pattern";
+    private static final ConfigDef CONFIG_DEF = new ConfigDef().define(PATTERN_CONFIG, ConfigDef.Type.STRING, ".*",
+            new RegexValidator(), ConfigDef.Importance.MEDIUM,
+            "A Java regular expression for matching against the name of a record's topic.");

Review comment:
       I changed it to `.*` only when I realised that the default had to be valid and before I knew about `NO_DEFAULT_VALUE`, so using `NO_DEFAULT_VALUE` is good. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#issuecomment-634585597


   Rebased for conflict.
   
   @kkonstantine I've addressed those first comments, thanks! Still some work on the integration test (not passing when run via gradle).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#discussion_r430867370



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
##########
@@ -257,12 +274,23 @@ public boolean includeRecordDetailsInErrorLog() {
         final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
         for (String alias : transformAliases) {
             final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
+
             try {
                 @SuppressWarnings("unchecked")
-                final Transformation<R> transformation = getClass(prefix + "type").asSubclass(Transformation.class)
-                        .getDeclaredConstructor().newInstance();
-                transformation.configure(originalsWithPrefix(prefix));
-                transformations.add(transformation);
+                final Transformation<R> transformation = Utils.newInstance(getClass(prefix + "type"), Transformation.class);
+                Map<String, Object> configs = originalsWithPrefix(prefix);
+                Object predicateAlias = configs.remove(PredicatedTransformation.PREDICATE_CONFIG);
+                Object negate = configs.remove(PredicatedTransformation.NEGATE_CONFIG);
+                transformation.configure(configs);
+                if (predicateAlias != null) {
+                    String predicatePrefix = "predicates." + predicateAlias + ".";

Review comment:
       I'd suggest declaring a `PREDICATES_PREFIX` variable for higher visibility. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PredicatedTransformation.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
+
+/**
+ * Decorator for a {@link Transformation} which applies the delegate only when a
+ * {@link Predicate} is true (or false, according to {@code negate}).
+ * @param <R>
+ */
+class PredicatedTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    static final String PREDICATE_CONFIG = "predicate";
+    static final String NEGATE_CONFIG = "negate";
+    /*test*/ Predicate<R> predicate;

Review comment:
       this type of comment is not something we use elsewhere and is not immediately obvious what it means. I'd suggest removing instead. 

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
##########
@@ -214,6 +216,187 @@ public void abstractKeyValueTransform() {
         }
     }
 
+

Review comment:
       nit: 2 extra lines

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/TaskHandle.java
##########
@@ -16,15 +16,17 @@
  */
 package org.apache.kafka.connect.integration;
 
-import org.apache.kafka.connect.errors.DataException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 import java.util.stream.IntStream;
 
+import org.apache.kafka.connect.errors.DataException;

Review comment:
       nit: same comment as above

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java
##########
@@ -16,19 +16,21 @@
  */
 package org.apache.kafka.connect.integration;
 
-import org.apache.kafka.connect.errors.DataException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import org.apache.kafka.connect.errors.DataException;

Review comment:
       nit: Our current import style says that these imports stay above the java ones. That's why these show up as changes here.  

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
##########
@@ -16,21 +16,23 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import java.util.Collections;

Review comment:
       nit: same as above

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
##########
@@ -276,116 +304,251 @@ public boolean includeRecordDetailsInErrorLog() {
      * <p>
      * {@code requireFullConfig} specifies whether required config values that are missing should cause an exception to be thrown.
      */
+    @SuppressWarnings({"rawtypes", "unchecked"})
     public static ConfigDef enrich(Plugins plugins, ConfigDef baseConfigDef, Map<String, String> props, boolean requireFullConfig) {
-        Object transformAliases = ConfigDef.parseType(TRANSFORMS_CONFIG, props.get(TRANSFORMS_CONFIG), Type.LIST);
-        if (!(transformAliases instanceof List)) {
-            return baseConfigDef;
-        }
-
         ConfigDef newDef = new ConfigDef(baseConfigDef);
-        LinkedHashSet<?> uniqueTransformAliases = new LinkedHashSet<>((List<?>) transformAliases);
-        for (Object o : uniqueTransformAliases) {
-            if (!(o instanceof String)) {
-                throw new ConfigException("Item in " + TRANSFORMS_CONFIG + " property is not of "
-                        + "type String");
+        new EnrichablePlugin<Transformation<?>>("Transformation", TRANSFORMS_CONFIG, TRANSFORMS_GROUP, (Class) Transformation.class,
+                props, requireFullConfig) {
+            @SuppressWarnings("rawtypes")
+            @Override
+            protected Set<PluginDesc<Transformation<?>>> plugins() {
+                return (Set) plugins.transformations();
             }
-            String alias = (String) o;
-            final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
-            final String group = TRANSFORMS_GROUP + ": " + alias;
-            int orderInGroup = 0;
-
-            final String transformationTypeConfig = prefix + "type";
-            final ConfigDef.Validator typeValidator = new ConfigDef.Validator() {
-                @Override
-                public void ensureValid(String name, Object value) {
-                    getConfigDefFromTransformation(transformationTypeConfig, (Class) value);
-                }
-            };
-            newDef.define(transformationTypeConfig, Type.CLASS, ConfigDef.NO_DEFAULT_VALUE, typeValidator, Importance.HIGH,
-                    "Class for the '" + alias + "' transformation.", group, orderInGroup++, Width.LONG, "Transformation type for " + alias,
-                    Collections.<String>emptyList(), new TransformationClassRecommender(plugins));
 
-            final ConfigDef transformationConfigDef;
-            try {
-                final String className = props.get(transformationTypeConfig);
-                final Class<?> cls = (Class<?>) ConfigDef.parseType(transformationTypeConfig, className, Type.CLASS);
-                transformationConfigDef = getConfigDefFromTransformation(transformationTypeConfig, cls);
-            } catch (ConfigException e) {
-                if (requireFullConfig) {
-                    throw e;
-                } else {
-                    continue;
+            @Override
+            protected ConfigDef initialConfigDef() {
+                // All Transformations get these config parameters implicitly
+                return super.initialConfigDef()
+                        .define(PredicatedTransformation.PREDICATE_CONFIG, Type.STRING, "", Importance.MEDIUM,
+                                "The alias of a predicate used to determine whether to apply this transformation.")
+                        .define(PredicatedTransformation.NEGATE_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM,
+                                "Whether the configured predicate should be negated.");
+            }
+
+            @Override
+            protected Stream<Map.Entry<String, ConfigDef.ConfigKey>> configDefsForClass(String typeConfig) {
+                return super.configDefsForClass(typeConfig)
+                    .filter(entry -> {
+                        // The implicit parameters mask any from the transformer with the same name
+                        if (PredicatedTransformation.PREDICATE_CONFIG.equals(entry.getValue())
+                                || PredicatedTransformation.NEGATE_CONFIG.equals(entry.getValue())) {
+                            log.warn("Transformer config {} is masked by implicit config of that name",
+                                    entry.getValue());
+                            return false;
+                        } else {
+                            return true;
+                        }
+                    });
+            }
+
+            @Override
+            protected ConfigDef config(Transformation<?> transformation) {
+                return transformation.config();
+            }
+
+            @Override
+            protected void validateProps(String prefix) {
+                String prefixedNegate = prefix + PredicatedTransformation.NEGATE_CONFIG;
+                String prefixedPredicate = prefix + PredicatedTransformation.PREDICATE_CONFIG;
+                if (props.containsKey(prefixedNegate) &&
+                        !props.containsKey(prefixedPredicate)) {
+                    throw new ConfigException("Config '" + prefixedNegate + "' was provided " +
+                            "but there is no config '" + prefixedPredicate + "' defining a predicate to be negated.");
                 }
             }
+        }.enrich(newDef);
 
-            newDef.embed(prefix, group, orderInGroup, transformationConfigDef);
-        }
+        new EnrichablePlugin<Predicate<?>>("Predicate", PREDICATES_CONFIG, TRANSFORMS_GROUP,
+                (Class) Predicate.class, props, requireFullConfig) {
+            @Override
+            protected Set<PluginDesc<Predicate<?>>> plugins() {
+                return (Set) plugins.predicates();
+            }
 
+            @Override
+            protected ConfigDef config(Predicate<?> predicate) {
+                return predicate.config();
+            }
+        }.enrich(newDef);
         return newDef;
     }
 
     /**
-     * Return {@link ConfigDef} from {@code transformationCls}, which is expected to be a non-null {@code Class<Transformation>},
-     * by instantiating it and invoking {@link Transformation#config()}.
+     * An abstraction over "enrichable plugins" ({@link Transformation}s and {@link Predicate}s) used for computing the
+     * contribution to a Connectors ConfigDef.
+     *
+     * This is not entirely elegant because
+     * although they basically use the same "alias prefix" configuration idiom there are some differences.
+     * The abstract method pattern is used to cope with this.
+     * @param <T> The type of plugin (either {@code Transformation} or {@code Predicate}).
      */
-    static ConfigDef getConfigDefFromTransformation(String key, Class<?> transformationCls) {
-        if (transformationCls == null || !Transformation.class.isAssignableFrom(transformationCls)) {
-            throw new ConfigException(key, String.valueOf(transformationCls), "Not a Transformation");
-        }
-        if (Modifier.isAbstract(transformationCls.getModifiers())) {
-            String childClassNames = Stream.of(transformationCls.getClasses())
-                .filter(transformationCls::isAssignableFrom)
-                .filter(c -> !Modifier.isAbstract(c.getModifiers()))
-                .filter(c -> Modifier.isPublic(c.getModifiers()))
-                .map(Class::getName)
-                .collect(Collectors.joining(", "));
-            String message = childClassNames.trim().isEmpty() ?
-                "Transformation is abstract and cannot be created." :
-                "Transformation is abstract and cannot be created. Did you mean " + childClassNames + "?";
-            throw new ConfigException(key, String.valueOf(transformationCls), message);
+    static abstract class EnrichablePlugin<T> {
+
+        private final String aliasKind;
+        private final String aliasConfig;
+        private final String aliasGroup;
+        private final Class<T> baseClass;
+        private final Map<String, String> props;
+        private final boolean requireFullConfig;
+
+        public EnrichablePlugin(
+                String aliasKind,
+                String aliasConfig, String aliasGroup, Class<T> baseClass,
+                Map<String, String> props, boolean requireFullConfig) {
+            this.aliasKind = aliasKind;
+            this.aliasConfig = aliasConfig;
+            this.aliasGroup = aliasGroup;
+            this.baseClass = baseClass;
+            this.props = props;
+            this.requireFullConfig = requireFullConfig;
         }
-        Transformation transformation;
-        try {
-            transformation = transformationCls.asSubclass(Transformation.class).getConstructor().newInstance();
-        } catch (Exception e) {
-            ConfigException exception = new ConfigException(key, String.valueOf(transformationCls), "Error getting config definition from Transformation: " + e.getMessage());
-            exception.initCause(e);
-            throw exception;
+
+        /** Add the configs for this alias to the given {@code ConfigDef}. */
+        void enrich(ConfigDef newDef) {
+            Object aliases = ConfigDef.parseType(aliasConfig, props.get(aliasConfig), Type.LIST);
+            if (!(aliases instanceof List)) {
+                return;
+            }
+
+            LinkedHashSet<?> uniqueAliases = new LinkedHashSet<>((List<?>) aliases);
+            for (Object o : uniqueAliases) {
+                if (!(o instanceof String)) {
+                    throw new ConfigException("Item in " + aliasConfig + " property is not of "
+                            + "type String");
+                }
+                String alias = (String) o;
+                final String prefix = aliasConfig + "." + alias + ".";
+                final String group = aliasGroup + ": " + alias;
+                int orderInGroup = 0;
+
+                final String typeConfig = prefix + "type";
+                final ConfigDef.Validator typeValidator = new ConfigDef.Validator() {

Review comment:
       Consider using `LambdaValidator` to be able to add a lambda for the `toString` method of this validator.  

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/TaskHandle.java
##########
@@ -37,22 +39,26 @@
     private final ConnectorHandle connectorHandle;
     private final AtomicInteger partitionsAssigned = new AtomicInteger(0);
     private final StartAndStopCounter startAndStopCounter = new StartAndStopCounter();
+    private final Consumer<SinkRecord> consumer;
 
     private CountDownLatch recordsRemainingLatch;
     private CountDownLatch recordsToCommitLatch;
     private int expectedRecords = -1;
     private int expectedCommits = -1;
 
-    public TaskHandle(ConnectorHandle connectorHandle, String taskId) {
-        log.info("Created task {} for connector {}", taskId, connectorHandle);
+    public TaskHandle(ConnectorHandle connectorHandle, String taskId, Consumer<SinkRecord> consumer) {
         this.taskId = taskId;
         this.connectorHandle = connectorHandle;
+        this.consumer = consumer;
     }
 
     /**
      * Record a message arrival at the task and the connector overall.
      */
-    public void record() {

Review comment:
       This might break existing tests that depend on Connect's integration tests framework. Probably good idea to keep it, in which case the consumer should be ignored. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#discussion_r429066772



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
##########
@@ -257,12 +273,25 @@ public boolean includeRecordDetailsInErrorLog() {
         final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
         for (String alias : transformAliases) {
             final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
+
             try {
                 @SuppressWarnings("unchecked")
                 final Transformation<R> transformation = getClass(prefix + "type").asSubclass(Transformation.class)
                         .getDeclaredConstructor().newInstance();
-                transformation.configure(originalsWithPrefix(prefix));
-                transformations.add(transformation);
+                Map<String, Object> configs = originalsWithPrefix(prefix);
+                Object predicateAlias = configs.remove("predicate");
+                Object negate = configs.remove("negate");
+                transformation.configure(configs);
+                if (predicateAlias != null) {
+                    String predicatePrefix = "predicates." + predicateAlias + ".";
+                    @SuppressWarnings("unchecked")
+                    Predicate<R> predicate = getClass(predicatePrefix + "type").asSubclass(Predicate.class)
+                            .getDeclaredConstructor().newInstance();
+                    predicate.configure(originalsWithPrefix(predicatePrefix));
+                    transformations.add(new PredicatedTransformation<>(predicate, negate == null ? false : Boolean.parseBoolean(negate.toString()), transformation));

Review comment:
       A more general question is "Why does `PredicatedTransformation` have a special constructor rather than use `configure(Map<String, ?>)`?" This arises because `PredicatedTransformer` is a bit special. In particular is has to be passed an already configured `Predicate` and a `Transformation`, which you couldn't do with normal `Transformation` (which can only be configured with the types supported by `ConfigDef`).
   
   I guess we could use `configure(Map<String, ?>)`, but then we have to instantiate a Map at this call site only to unpick it again in `PredicatedTransformation`, which feels like more work than just having a constructor. But if you prefer the consistency of using `configure()` I'm happy to do it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#issuecomment-635196787


   @rhauch @kkonstantine thanks very much for making those fixes for me, I appreciate the time that must've taken. I've made one more trivial correction to the integration test.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#issuecomment-635072158






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#discussion_r430058102



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PredicatedTransformation.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
+
+/**
+ * Decorator for a {@link Transformation} which applies the delegate only when a
+ * {@link Predicate} is true (or false, according to {@code negate}).
+ * @param <R>
+ */
+class PredicatedTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    /*test*/ final Predicate<R> predicate;
+    /*test*/ final Transformation<R> delegate;
+    /*test*/ final boolean negate;
+
+    PredicatedTransformation(Predicate<R> predicate, boolean negate, Transformation<R> delegate) {
+        this.predicate = predicate;
+        this.negate = negate;
+        this.delegate = delegate;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+
+    }
+
+    @Override
+    public R apply(R record) {
+        if (negate ^ predicate.test(record)) {
+            return delegate.apply(record);
+        }
+        return record;
+    }
+
+    @Override
+    public ConfigDef config() {
+        return null;

Review comment:
       Hmmm... I think there might be some awkwardness here with trying to make `PredicatedTransformer` implement the `Transformation` interface. Could we replace every `Transformation` in the `TransformationChain`'s [transformation list](https://github.com/apache/kafka/blob/de6468ae5915298279e229dc64721e01e7d14fab/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java#L33) with a `PredicatedTransformer` and, if there are no predicates configured for a transform by the user, make the default behavior for the `PredicatedTransformer` class to blindly apply its transformation?
   
   This would solve a few problems:
   
   - No risk of users trying to actually use a `PredicatedTransformer` in a connector config, which they may try to do if we don't add logic to prevent it from being picked up during plugin path scanning on startup and logged as an SMT plugin
   - No need to implement methods that aren't used
   - One code path instead of two for application of transformations
   - More flexibility in instantiation and, possibly, the ability to encapsulate some of the `ConfigDef` generation logic in a separate class from `ConnectorConfig` (haven't looked into the specifics of this yet so may not actually be feasible or that elegant)

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
##########
@@ -257,12 +273,25 @@ public boolean includeRecordDetailsInErrorLog() {
         final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
         for (String alias : transformAliases) {
             final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
+
             try {
                 @SuppressWarnings("unchecked")
                 final Transformation<R> transformation = getClass(prefix + "type").asSubclass(Transformation.class)
                         .getDeclaredConstructor().newInstance();
-                transformation.configure(originalsWithPrefix(prefix));
-                transformations.add(transformation);
+                Map<String, Object> configs = originalsWithPrefix(prefix);
+                Object predicateAlias = configs.remove("predicate");
+                Object negate = configs.remove("negate");

Review comment:
       Hmmm... wish we'd caught that earlier. Seems safer to just leave the properties in, but unless we want to call for a re-vote and an extension on the KIP deadline guess we'll have to keep this as-is.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#issuecomment-632551475


   @C0urante thanks for the review, some excellent points there! I think an integration test is a great idea, which I'll work on next. I've addressed all your other comments. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch removed a comment on pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
rhauch removed a comment on pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#issuecomment-634846388






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#discussion_r431542321



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
##########
@@ -128,6 +131,10 @@ public void testAllowedConnectFrameworkClasses() {
         assertTrue(PluginUtils.shouldLoadInIsolation(
                 "org.apache.kafka.connect.transforms.ExtractField$Key")
         );
+        assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.transforms.predicates."));
+        assertTrue(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.transforms.predicates.TopicNameMatches")
+        );

Review comment:
       Given the small number of predicates, I agree with @rhauch 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch edited a comment on pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
rhauch edited a comment on pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#issuecomment-635072158


   Builds were all aborted: `Aborted by user anonymous`
   * https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2554/
   * https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6574/
   * https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/713/


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#discussion_r429081838



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PredicatedTransformation.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
+
+/**
+ * Decorator for a {@link Transformation} which applies the delegate only when a
+ * {@link Predicate} is true (or false, according to {@code negate}).
+ * @param <R>
+ */
+class PredicatedTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    /*test*/ final Predicate<R> predicate;
+    /*test*/ final Transformation<R> delegate;
+    /*test*/ final boolean negate;
+
+    PredicatedTransformation(Predicate<R> predicate, boolean negate, Transformation<R> delegate) {
+        this.predicate = predicate;
+        this.negate = negate;
+        this.delegate = delegate;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+
+    }
+
+    @Override
+    public R apply(R record) {
+        if (negate ^ predicate.test(record)) {
+            return delegate.apply(record);
+        }
+        return record;
+    }
+
+    @Override
+    public ConfigDef config() {
+        return null;

Review comment:
       This is related to the issue discussed about `configure()`.
   
   We could return an empty `ConfigDef` here, but that would be a lie which could ultimately lead some other error if someone tried to use it with `configure()`.
   
   We can't invent a `ConfigDef` schema for this because the `PredicatedTransformer` would need to know about the `Transformer` is was going to be wrapping, but it can't know that before it's been configured with at least the `Transformer`'s `ConfigDef` and it can't be configured before `config()` has been called. So we have a chicken and egg problem. Something (`ConnectorConfig`) must have some _a priori_ knowledge of either `PredicatedTransformer`'s `ConfigDef`, or know how to configure it without needing to call `config()` at all.
   
   Since `PredicatedTransformer` is a purely internal class which will never be directly exposed to Connect users, we're not obliged to stick to the contract of `config()` and `configure()`. i.e. So  both `PredicatedTransformer.config` and `PredicatedTransformer.configure` can throw when called, since we know no one else can call them and we know `ConnectorConfig` never will.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#issuecomment-635363510


   Thanks, @tombentley. Merging with 3 green builds that Jenkins did not tie back to this PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#issuecomment-634846388






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#issuecomment-631651186


   Still need to review test coverage, but @kkonstantine, @mimaison, @bbejeck you might want to give it an initial pass.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#discussion_r431652348



##########
File path: connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKeyTest.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.config.ConfigValue;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.Test;
+
+import static java.util.Collections.singletonList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class HasHeaderKeyTest {
+

Review comment:
       The empty string is a valid regex, so I used `CompositeValidator.of(new NonEmptyString(), new RegexValidator())` in `TopicNameMatches`, rather than just `new RegexValidator()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#issuecomment-635081325


   Jenkins jobs were never associated with this PR (see [INFRA-20344](https://issues.apache.org/jira/browse/INFRA-20344)), but here are the jobs for the latest commit:
   * https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/715
   * https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6576
   * https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2556


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#issuecomment-635007497


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#discussion_r431328464



##########
File path: connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatchesTest.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.kafka.common.config.ConfigValue;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.Test;
+
+import static java.util.Collections.singletonList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TopicNameMatchesTest {
+

Review comment:
       Maybe a few more tests that test the ConfigDef and AbstractConfig validation:
   ```
       @Test
       public void testPatternRequiredInConfig() {
           Map<String, String> props = new HashMap<>();
           ConfigException e = assertThrows(ConfigException.class, () -> config(props));
           assertTrue(e.getMessage().contains("Missing required configuration \"pattern\""));
       }
   
       @Test
       public void testPatternMayNotBeEmptyInConfig() {
           Map<String, String> props = new HashMap<>();
           props.put("pattern", "");
           ConfigException e = assertThrows(ConfigException.class, () -> config(props));
           System.out.println(e.getMessage());
           assertTrue(e.getMessage().contains("String must be non-empty"));
       }
   
       @Test
       public void testPatternIsValidRegexInConfig() {
           Map<String, String> props = new HashMap<>();
           props.put("pattern", "[");
           ConfigException e = assertThrows(ConfigException.class, () -> config(props));
           System.out.println(e.getMessage());
           assertTrue(e.getMessage().contains("Invalid regex"));
       }
   
       protected SimpleConfig config(Map<String, String> props) {
           return new SimpleConfig(new TopicNameMatches().config(), props);
       }
   ```

##########
File path: connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKeyTest.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.config.ConfigValue;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.Test;
+
+import static java.util.Collections.singletonList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class HasHeaderKeyTest {
+

Review comment:
       Maybe a few more tests that test the ConfigDef and AbstractConfig validation:
   ```
       @Test
       public void testNameRequiredInConfig() {
           Map<String, String> props = new HashMap<>();
           ConfigException e = assertThrows(ConfigException.class, () -> config(props));
           assertTrue(e.getMessage().contains("Missing required configuration \"name\""));
       }
   
       @Test
       public void testNameMayNotBeEmptyInConfig() {
           Map<String, String> props = new HashMap<>();
           props.put("name", "");
           ConfigException e = assertThrows(ConfigException.class, () -> config(props));
           assertTrue(e.getMessage().contains("String must be non-empty"));
       }
   
       protected SimpleConfig config(Map<String, String> props) {
           return new SimpleConfig(new HasHeaderKey().config(), props);
       }
   ```
   BTW, note that the `new HasHeaderKey().config()` is required because there is no accessible static `ConfigDef`. Might want to just make the static field package protected.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
##########
@@ -128,6 +131,10 @@ public void testAllowedConnectFrameworkClasses() {
         assertTrue(PluginUtils.shouldLoadInIsolation(
                 "org.apache.kafka.connect.transforms.ExtractField$Key")
         );
+        assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.transforms.predicates."));
+        assertTrue(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.transforms.predicates.TopicNameMatches")
+        );

Review comment:
       Should we also check the other two predicate implementations, too? I know it's not strictly required, but it would help to better ensure the pattern matches more than just one implementation.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
##########
@@ -214,6 +216,185 @@ public void abstractKeyValueTransform() {
         }
     }
 
+    @Test(expected = ConfigException.class)
+    public void wrongPredicateType() {
+        Map<String, String> props = new HashMap<>();
+        props.put("name", "test");
+        props.put("connector.class", TestConnector.class.getName());
+        props.put("transforms", "a");
+        props.put("transforms.a.type", SimpleTransformation.class.getName());
+        props.put("transforms.a.magic.number", "42");
+        props.put("transforms.a.predicate", "my-pred");
+        props.put("predicates", "my-pred");
+        props.put("predicates.my-pred.type", TestConnector.class.getName());
+        new ConnectorConfig(MOCK_PLUGINS, props);
+    }
+
+    @Test
+    public void singleConditionalTransform() {
+        Map<String, String> props = new HashMap<>();
+        props.put("name", "test");
+        props.put("connector.class", TestConnector.class.getName());
+        props.put("transforms", "a");
+        props.put("transforms.a.type", SimpleTransformation.class.getName());
+        props.put("transforms.a.magic.number", "42");
+        props.put("transforms.a.predicate", "my-pred");
+        props.put("transforms.a.negate", "true");
+        props.put("predicates", "my-pred");
+        props.put("predicates.my-pred.type", TestPredicate.class.getName());
+        props.put("predicates.my-pred.int", "84");
+        assertPredicatedTransform(props, true);
+    }
+
+    @Test
+    public void predicateNegationDefaultsToFalse() {
+        Map<String, String> props = new HashMap<>();
+        props.put("name", "test");
+        props.put("connector.class", TestConnector.class.getName());
+        props.put("transforms", "a");
+        props.put("transforms.a.type", SimpleTransformation.class.getName());
+        props.put("transforms.a.magic.number", "42");
+        props.put("transforms.a.predicate", "my-pred");
+        props.put("predicates", "my-pred");
+        props.put("predicates.my-pred.type", TestPredicate.class.getName());
+        props.put("predicates.my-pred.int", "84");
+        assertPredicatedTransform(props, false);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void abstractPredicate() {
+        Map<String, String> props = new HashMap<>();
+        props.put("name", "test");
+        props.put("connector.class", TestConnector.class.getName());
+        props.put("transforms", "a");
+        props.put("transforms.a.type", SimpleTransformation.class.getName());
+        props.put("transforms.a.magic.number", "42");
+        props.put("transforms.a.predicate", "my-pred");
+        props.put("predicates", "my-pred");
+        props.put("predicates.my-pred.type", AbstractTestPredicate.class.getName());
+        props.put("predicates.my-pred.int", "84");
+        assertPredicatedTransform(props, false);
+    }
+
+    private void assertPredicatedTransform(Map<String, String> props, boolean expectedNegated) {
+        final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props);
+        final List<Transformation<R>> transformations = config.transformations();
+        assertEquals(1, transformations.size());
+        assertTrue(transformations.get(0) instanceof PredicatedTransformation);
+        PredicatedTransformation<?> predicated = (PredicatedTransformation<?>) transformations.get(0);
+
+        assertEquals(expectedNegated, predicated.negate);
+
+        assertTrue(predicated.delegate instanceof ConnectorConfigTest.SimpleTransformation);
+        assertEquals(42, ((SimpleTransformation<?>) predicated.delegate).magicNumber);
+
+        assertTrue(predicated.predicate instanceof ConnectorConfigTest.TestPredicate);
+        assertEquals(84, ((TestPredicate<?>) predicated.predicate).param);
+
+        predicated.close();
+
+        assertEquals(0, ((SimpleTransformation<?>) predicated.delegate).magicNumber);
+        assertEquals(0, ((TestPredicate<?>) predicated.predicate).param);
+    }
+
+    @Test
+    public void misconfiguredPredicate() {
+        Map<String, String> props = new HashMap<>();
+        props.put("name", "test");
+        props.put("connector.class", TestConnector.class.getName());
+        props.put("transforms", "a");
+        props.put("transforms.a.type", SimpleTransformation.class.getName());
+        props.put("transforms.a.magic.number", "42");
+        props.put("transforms.a.predicate", "my-pred");
+        props.put("transforms.a.negate", "true");
+        props.put("predicates", "my-pred");
+        props.put("predicates.my-pred.type", TestPredicate.class.getName());
+        props.put("predicates.my-pred.int", "79");
+        try {
+            new ConnectorConfig(MOCK_PLUGINS, props);
+            fail();
+        } catch (ConfigException e) {
+            assertTrue(e.getMessage().contains("Value must be at least 80"));
+        }
+    }
+
+    @Ignore("Is this really an error. There's no actual need for the predicates config (unlike transforms where it defines the order).")

Review comment:
       It'd be good to have the test reflect the current behavior.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
##########
@@ -214,6 +216,185 @@ public void abstractKeyValueTransform() {
         }
     }
 
+    @Test(expected = ConfigException.class)
+    public void wrongPredicateType() {
+        Map<String, String> props = new HashMap<>();
+        props.put("name", "test");
+        props.put("connector.class", TestConnector.class.getName());
+        props.put("transforms", "a");
+        props.put("transforms.a.type", SimpleTransformation.class.getName());
+        props.put("transforms.a.magic.number", "42");
+        props.put("transforms.a.predicate", "my-pred");
+        props.put("predicates", "my-pred");
+        props.put("predicates.my-pred.type", TestConnector.class.getName());
+        new ConnectorConfig(MOCK_PLUGINS, props);
+    }
+
+    @Test
+    public void singleConditionalTransform() {
+        Map<String, String> props = new HashMap<>();
+        props.put("name", "test");
+        props.put("connector.class", TestConnector.class.getName());
+        props.put("transforms", "a");
+        props.put("transforms.a.type", SimpleTransformation.class.getName());
+        props.put("transforms.a.magic.number", "42");
+        props.put("transforms.a.predicate", "my-pred");
+        props.put("transforms.a.negate", "true");
+        props.put("predicates", "my-pred");
+        props.put("predicates.my-pred.type", TestPredicate.class.getName());
+        props.put("predicates.my-pred.int", "84");
+        assertPredicatedTransform(props, true);
+    }
+
+    @Test
+    public void predicateNegationDefaultsToFalse() {
+        Map<String, String> props = new HashMap<>();
+        props.put("name", "test");
+        props.put("connector.class", TestConnector.class.getName());
+        props.put("transforms", "a");
+        props.put("transforms.a.type", SimpleTransformation.class.getName());
+        props.put("transforms.a.magic.number", "42");
+        props.put("transforms.a.predicate", "my-pred");
+        props.put("predicates", "my-pred");
+        props.put("predicates.my-pred.type", TestPredicate.class.getName());
+        props.put("predicates.my-pred.int", "84");
+        assertPredicatedTransform(props, false);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void abstractPredicate() {
+        Map<String, String> props = new HashMap<>();
+        props.put("name", "test");
+        props.put("connector.class", TestConnector.class.getName());
+        props.put("transforms", "a");
+        props.put("transforms.a.type", SimpleTransformation.class.getName());
+        props.put("transforms.a.magic.number", "42");
+        props.put("transforms.a.predicate", "my-pred");
+        props.put("predicates", "my-pred");
+        props.put("predicates.my-pred.type", AbstractTestPredicate.class.getName());
+        props.put("predicates.my-pred.int", "84");
+        assertPredicatedTransform(props, false);
+    }
+
+    private void assertPredicatedTransform(Map<String, String> props, boolean expectedNegated) {
+        final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props);
+        final List<Transformation<R>> transformations = config.transformations();
+        assertEquals(1, transformations.size());
+        assertTrue(transformations.get(0) instanceof PredicatedTransformation);
+        PredicatedTransformation<?> predicated = (PredicatedTransformation<?>) transformations.get(0);
+
+        assertEquals(expectedNegated, predicated.negate);
+
+        assertTrue(predicated.delegate instanceof ConnectorConfigTest.SimpleTransformation);
+        assertEquals(42, ((SimpleTransformation<?>) predicated.delegate).magicNumber);
+
+        assertTrue(predicated.predicate instanceof ConnectorConfigTest.TestPredicate);
+        assertEquals(84, ((TestPredicate<?>) predicated.predicate).param);
+
+        predicated.close();
+
+        assertEquals(0, ((SimpleTransformation<?>) predicated.delegate).magicNumber);
+        assertEquals(0, ((TestPredicate<?>) predicated.predicate).param);
+    }
+
+    @Test
+    public void misconfiguredPredicate() {
+        Map<String, String> props = new HashMap<>();
+        props.put("name", "test");
+        props.put("connector.class", TestConnector.class.getName());
+        props.put("transforms", "a");
+        props.put("transforms.a.type", SimpleTransformation.class.getName());
+        props.put("transforms.a.magic.number", "42");
+        props.put("transforms.a.predicate", "my-pred");
+        props.put("transforms.a.negate", "true");
+        props.put("predicates", "my-pred");
+        props.put("predicates.my-pred.type", TestPredicate.class.getName());
+        props.put("predicates.my-pred.int", "79");
+        try {
+            new ConnectorConfig(MOCK_PLUGINS, props);
+            fail();
+        } catch (ConfigException e) {
+            assertTrue(e.getMessage().contains("Value must be at least 80"));
+        }

Review comment:
       We've moved to using `assertThrows` here, which would look something like:
   ```suggestion
           ConfigException e = assertThrows(ConfigException.class, () -> new ConnectorConfig(MOCK_PLUGINS, props));
           assertTrue(e.getMessage().contains("Value must be at least 42"));
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#discussion_r431653319



##########
File path: connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKeyTest.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.config.ConfigValue;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.Test;
+
+import static java.util.Collections.singletonList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class HasHeaderKeyTest {
+

Review comment:
       Oops, replied to wrong comment, but I'm sure you guessed what I mean.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#discussion_r429064491



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
##########
@@ -257,12 +273,25 @@ public boolean includeRecordDetailsInErrorLog() {
         final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
         for (String alias : transformAliases) {
             final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
+
             try {
                 @SuppressWarnings("unchecked")
                 final Transformation<R> transformation = getClass(prefix + "type").asSubclass(Transformation.class)
                         .getDeclaredConstructor().newInstance();
-                transformation.configure(originalsWithPrefix(prefix));
-                transformations.add(transformation);
+                Map<String, Object> configs = originalsWithPrefix(prefix);
+                Object predicateAlias = configs.remove("predicate");
+                Object negate = configs.remove("negate");

Review comment:
       The [compatibility section](https://cwiki.apache.org/confluence/display/KAFKA/KIP-585%3A+Filter+and+Conditional+SMTs#KIP-585:FilterandConditionalSMTs-Compatibility,Deprecation,andMigrationPlan) of the KIP says that if a connector already has these configs then they'll be masked by the new implicit configs. If we don't remove them here then we'd be passing the KIP-585 configs to a connector which had it's own semantics for those config keys, which would be incorrect.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#issuecomment-635077545


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#issuecomment-631683003


   @C0urante you might also want to take a look.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#issuecomment-635007361






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8699: KAFKA-9673: Filter and Conditional SMTs

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#issuecomment-635034899


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org