You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/27 16:48:35 UTC

[GitHub] [kafka] rhauch commented on a change in pull request #8699: KAFKA-9673: Filter and Conditional SMTs

rhauch commented on a change in pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#discussion_r431281159



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.transforms.Filter;
+import org.apache.kafka.connect.transforms.predicates.HasHeaderKey;
+import org.apache.kafka.connect.transforms.predicates.RecordIsTombstone;
+import org.apache.kafka.connect.transforms.predicates.TopicNameMatches;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.PREDICATES_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TRANSFORMS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * An integration test for connectors with transformations
+ */
+@Category(IntegrationTest.class)
+public class TransformationIntegrationTest {
+
+    private static final int NUM_RECORDS_PRODUCED = 2000;
+    private static final int NUM_TOPIC_PARTITIONS = 3;
+    private static final long RECORD_TRANSFER_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
+    private static final long OBSERVED_RECORDS_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
+    private static final int NUM_TASKS = 3;
+    private static final int NUM_WORKERS = 3;
+    private static final String CONNECTOR_NAME = "simple-conn";
+    private static final String SINK_CONNECTOR_CLASS_NAME = MonitorableSinkConnector.class.getSimpleName();
+    private static final String SOURCE_CONNECTOR_CLASS_NAME = MonitorableSourceConnector.class.getSimpleName();
+
+    private EmbeddedConnectCluster connect;
+    private ConnectorHandle connectorHandle;
+
+    @Before
+    public void setup() {
+        // setup Connect worker properties
+        Map<String, String> exampleWorkerProps = new HashMap<>();
+        exampleWorkerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000));
+
+        // setup Kafka broker properties
+        Properties exampleBrokerProps = new Properties();
+        exampleBrokerProps.put("auto.create.topics.enable", "false");
+
+        // build a Connect cluster backed by Kafka and Zk
+        connect = new EmbeddedConnectCluster.Builder()
+                .name("connect-cluster")
+                .numWorkers(NUM_WORKERS)
+                .numBrokers(1)
+                .workerProps(exampleWorkerProps)
+                .brokerProps(exampleBrokerProps)
+                .build();
+
+        // start the clusters
+        connect.start();

Review comment:
       Should we wait until all brokers and Connect workers are available, via something like:
   ```
           connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Brokers did not start in time.");
           connect.assertions().assertExactlyNumWorkersAreUp(numWorkers, "Worker did not start in time.");
   ```

##########
File path: connect/api/src/main/java/org/apache/kafka/connect/transforms/predicates/Predicate.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+
+/**
+ * <p>A predicate on records.
+ * Predicates can be used to conditionally apply a {@link org.apache.kafka.connect.transforms.Transformation}
+ * by configuring the transformation's {@code predicate} (and {@code negate}) configuration parameters.
+ * In particular, the {@code Filter} transformation can be conditionally applied in order to filter
+ * certain records from further processing.
+ *
+ * <p>Implementations of this interface must be public and have a public constructor with no parameters.
+ *
+ * @param <R> The type of record.
+ */
+public interface Predicate<R extends ConnectRecord<R>> extends Configurable, AutoCloseable {
+
+    /**
+     * Configuration specification for this predicate.
+     */
+    ConfigDef config();
+
+    /**
+     * Returns whether the given record satisfies this predicate.
+     */

Review comment:
       ```suggestion
       /**
        * Returns whether the given record satisfies this predicate.
        *
        * @param record the record to evaluate; may not be null
        * @return true if the predicate matches, or false otherwise
        */
   ```

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+/**
+ * A predicate which is true for records with at least one header with the configured name.
+ * @param <R> The type of connect record.
+ */
+public class HasHeaderKey<R extends ConnectRecord<R>> implements Predicate<R> {
+
+    private static final String NAME_CONFIG = "name";
+    private static final ConfigDef CONFIG_DEF = new ConfigDef().define(NAME_CONFIG, ConfigDef.Type.STRING, null,

Review comment:
       Here the default is null, which means that the configuration validation allows the `name` field to not be set. Per the KIP, we want to require that `name` is set. To do that, we should use `ConfigDef.NO_DEFAULT_VALUE`:
   ```suggestion
       private static final ConfigDef CONFIG_DEF = new ConfigDef().define(NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
   ```

##########
File path: connect/api/src/main/java/org/apache/kafka/connect/transforms/predicates/Predicate.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+
+/**
+ * <p>A predicate on records.
+ * Predicates can be used to conditionally apply a {@link org.apache.kafka.connect.transforms.Transformation}
+ * by configuring the transformation's {@code predicate} (and {@code negate}) configuration parameters.
+ * In particular, the {@code Filter} transformation can be conditionally applied in order to filter
+ * certain records from further processing.
+ *
+ * <p>Implementations of this interface must be public and have a public constructor with no parameters.
+ *
+ * @param <R> The type of record.
+ */
+public interface Predicate<R extends ConnectRecord<R>> extends Configurable, AutoCloseable {
+
+    /**
+     * Configuration specification for this predicate.
+     */

Review comment:
       ```suggestion
       /**
        * Configuration specification for this predicate.
        *
        * @return the configuration definition for this predicate; never null
        */
   ```

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.transforms.Filter;
+import org.apache.kafka.connect.transforms.predicates.HasHeaderKey;
+import org.apache.kafka.connect.transforms.predicates.RecordIsTombstone;
+import org.apache.kafka.connect.transforms.predicates.TopicNameMatches;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.PREDICATES_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TRANSFORMS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * An integration test for connectors with transformations
+ */
+@Category(IntegrationTest.class)
+public class TransformationIntegrationTest {
+
+    private static final int NUM_RECORDS_PRODUCED = 2000;
+    private static final int NUM_TOPIC_PARTITIONS = 3;
+    private static final long RECORD_TRANSFER_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
+    private static final long OBSERVED_RECORDS_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
+    private static final int NUM_TASKS = 3;
+    private static final int NUM_WORKERS = 3;
+    private static final String CONNECTOR_NAME = "simple-conn";
+    private static final String SINK_CONNECTOR_CLASS_NAME = MonitorableSinkConnector.class.getSimpleName();
+    private static final String SOURCE_CONNECTOR_CLASS_NAME = MonitorableSourceConnector.class.getSimpleName();
+
+    private EmbeddedConnectCluster connect;
+    private ConnectorHandle connectorHandle;
+
+    @Before
+    public void setup() {
+        // setup Connect worker properties
+        Map<String, String> exampleWorkerProps = new HashMap<>();
+        exampleWorkerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000));
+
+        // setup Kafka broker properties
+        Properties exampleBrokerProps = new Properties();
+        exampleBrokerProps.put("auto.create.topics.enable", "false");
+
+        // build a Connect cluster backed by Kafka and Zk
+        connect = new EmbeddedConnectCluster.Builder()
+                .name("connect-cluster")
+                .numWorkers(NUM_WORKERS)
+                .numBrokers(1)
+                .workerProps(exampleWorkerProps)
+                .brokerProps(exampleBrokerProps)
+                .build();
+
+        // start the clusters
+        connect.start();
+
+        // get a handle to the connector
+        connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
+    }
+
+    @After
+    public void close() {
+        // delete connector handle
+        RuntimeHandles.get().deleteConnector(CONNECTOR_NAME);
+
+        // stop all Connect, Kafka and Zk threads.
+        connect.stop();
+    }
+
+    /**
+     * Test the {@link Filter} transformer with a
+     * {@link TopicNameMatches} predicate on a sink connector.
+     */
+    @Test
+    public void testFilterOnTopicNameWithSinkConnector() throws Exception {
+        Map<String, Long> observedRecords = observeRecords();
+
+        // create test topics
+        String fooTopic = "foo-topic";
+        String barTopic = "bar-topic";
+        int numFooRecords = NUM_RECORDS_PRODUCED;
+        int numBarRecords = NUM_RECORDS_PRODUCED;
+        connect.kafka().createTopic(fooTopic, NUM_TOPIC_PARTITIONS);
+        connect.kafka().createTopic(barTopic, NUM_TOPIC_PARTITIONS);
+
+        // setup up props for the sink connector
+        Map<String, String> props = new HashMap<>();
+        props.put("name", CONNECTOR_NAME);
+        props.put(CONNECTOR_CLASS_CONFIG, SINK_CONNECTOR_CLASS_NAME);
+        props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+        props.put(TOPICS_CONFIG, String.join(",", fooTopic, barTopic));
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(TRANSFORMS_CONFIG, "filter");
+        props.put(TRANSFORMS_CONFIG + ".filter.type", Filter.class.getSimpleName());
+        props.put(TRANSFORMS_CONFIG + ".filter.predicate", "barPredicate");
+        props.put(PREDICATES_CONFIG, "barPredicate");
+        props.put(PREDICATES_CONFIG + ".barPredicate.type", TopicNameMatches.class.getSimpleName());
+        props.put(PREDICATES_CONFIG + ".barPredicate.pattern", "bar-.*");
+
+        // expect all records to be consumed by the connector
+        connectorHandle.expectedRecords(numFooRecords);
+
+        // expect all records to be consumed by the connector
+        connectorHandle.expectedCommits(numFooRecords);
+
+        // start a sink connector
+        connect.configureConnector(CONNECTOR_NAME, props);

Review comment:
       This is an asynchronous method, and it's likely the connector will not be started and running before the test proceeds to the next statements. This can lead to very flaky tests.
   
   We could instead wait until the connector is actually running, using something like:
   ```
           connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
                   "Connector tasks did not start in time.");
   ```

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.util.RegexValidator;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+/**
+ * A predicate which is true for records with a topic name that matches the configured regular expression.
+ * @param <R> The type of connect record.
+ */
+public class TopicNameMatches<R extends ConnectRecord<R>> implements Predicate<R> {
+
+    private static final String PATTERN_CONFIG = "pattern";
+    private static final ConfigDef CONFIG_DEF = new ConfigDef().define(PATTERN_CONFIG, ConfigDef.Type.STRING, ".*",
+            new RegexValidator(), ConfigDef.Importance.MEDIUM,
+            "A Java regular expression for matching against the name of a record's topic.");

Review comment:
       The default doesn't match the KIP. Either we should update the KIP to accept `.*` as the default pattern, or we should use `ConfigDef.NO_DEFAULT_VALUE` as the default to require the `pattern` to be set.
   

##########
File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.util.RegexValidator;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+/**
+ * A predicate which is true for records with a topic name that matches the configured regular expression.
+ * @param <R> The type of connect record.
+ */
+public class TopicNameMatches<R extends ConnectRecord<R>> implements Predicate<R> {
+
+    private static final String PATTERN_CONFIG = "pattern";
+    private static final ConfigDef CONFIG_DEF = new ConfigDef().define(PATTERN_CONFIG, ConfigDef.Type.STRING, ".*",
+            new RegexValidator(), ConfigDef.Importance.MEDIUM,
+            "A Java regular expression for matching against the name of a record's topic.");
+    private Pattern pattern;
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public boolean test(R record) {
+        return record.topic() != null && pattern.matcher(record.topic()).matches();
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        SimpleConfig simpleConfig = new SimpleConfig(config(), configs);
+        Pattern result;
+        String value = simpleConfig.getString(PATTERN_CONFIG);
+        try {
+            result = Pattern.compile(value);
+        } catch (PatternSyntaxException e) {
+            throw new ConfigException(PATTERN_CONFIG, value, "entry must be a Java-compatible regular expression: " + e.getMessage());
+        }

Review comment:
       Can we ever get to line 64? The constructor of the config (line 58) should fail if the `pattern` validator fails to ensure the pattern is a valid regex, which means that if we make it past 58 then line 62 will never fail.
   
   Am I missing something?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org