You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Tom Bentley (Jira)" <ji...@apache.org> on 2020/03/27 08:35:00 UTC

[jira] [Commented] (KAFKA-9673) Conditionally apply SMTs

    [ https://issues.apache.org/jira/browse/KAFKA-9673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17068418#comment-17068418 ] 

Tom Bentley commented on KAFKA-9673:
------------------------------------

I opened [KIP-585|https://cwiki.apache.org/confluence/display/KAFKA/KIP-585%3A+Conditional+SMT] for discussion.

> Conditionally apply SMTs
> ------------------------
>
>                 Key: KAFKA-9673
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9673
>             Project: Kafka
>          Issue Type: New Feature
>          Components: KafkaConnect
>            Reporter: Tom Bentley
>            Assignee: Tom Bentley
>            Priority: Major
>
> KAFKA-7052 ended up using IAE with a message, rather than NPE in the case of a SMT being applied to a record lacking a given field. It's still not possible to apply a SMT conditionally, which is what things like Debezium really need in order to apply transformations only to non-schema change events.
> [~rhauch] suggested a mechanism to conditionally apply any SMT but was concerned about the possibility of a naming collision (assuming it was configured by a simple config)
> I'd like to propose something which would solve this problem without the possibility of such collisions. The idea is to have a higher-level condition, which applies an arbitrary transformation (or transformation chain) according to some predicate on the record. 
> More concretely, it might be configured like this:
> {noformat}
>   transforms.conditionalExtract.type: Conditional
>   transforms.conditionalExtract.transforms: extractInt
>   transforms.conditionalExtract.transforms.extractInt.type: org.apache.kafka.connect.transforms.ExtractField$Key
>   transforms.conditionalExtract.transforms.extractInt.field: c1
>   transforms.conditionalExtract.condition: topic-matches:<someRegexHere>
> {noformat}
> * The {{Conditional}} SMT is configured with its own list of transforms ({{transforms.conditionalExtract.transforms}}) to apply. This would work just like the top level {{transforms}} config, so subkeys can be used to configure these transforms in the usual way.
> * The {{condition}} config defines the predicate for when the transforms are applied to a record using a {{<condition-type>:<parameters>}} syntax
> We could initially support three condition types:
> *{{topic-matches:<pattern>}}* The transformation would be applied if the record's topic name matched the given regular expression pattern. For example, the following would apply the transformation on records being sent to any topic with a name beginning with "my-prefix-":
> {noformat}
>        transforms.conditionalExtract.condition: topic-matches:my-prefix-.*
> {noformat}
>    
> *{{has-header:<header-name>}}* The transformation would be applied if the record had at least one header with the given name. For example, the following will apply the transformation on records with at least one header with the name "my-header":
> {noformat}
>        transforms.conditionalExtract.condition: has-header:my-header
> {noformat}
>    
> *{{not:<condition-name>}}* This would negate the result of another named condition using the condition config prefix. For example, the following will apply the transformation on records which lack any header with the name my-header:
> {noformat}
>       transforms.conditionalExtract.condition: not:hasMyHeader
>       transforms.conditionalExtract.condition.hasMyHeader: has-header:my-header
> {noformat}
> I foresee one implementation concern with this approach, which is that currently {{Transformation}} has to return a fixed {{ConfigDef}}, and this proposal would require something more flexible in order to allow the config parameters to depend on the listed transform aliases (and similarly for named predicate used for the {{not:}} predicate). I think this could be done by adding a {{default}} method to {{Transformation}} for getting the ConfigDef given the config, for example.
> Obviously this would require a KIP, but before I spend any more time on this I'd be interested in your thoughts [~rhauch], [~rmoff], [~gunnar.morling].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)