You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Mickael Maison (Jira)" <ji...@apache.org> on 2023/02/14 14:32:00 UTC

[jira] [Assigned] (KAFKA-13632) MirrorMaker 2.0 NPE and Warning "Failure to commit records" for filtered records

     [ https://issues.apache.org/jira/browse/KAFKA-13632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Mickael Maison reassigned KAFKA-13632:
--------------------------------------

    Assignee: Rens Groothuijsen

> MirrorMaker 2.0 NPE and Warning "Failure to commit records" for filtered records
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-13632
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13632
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 3.1.0, 3.2.0, 3.3.0
>            Reporter: Bert Baron
>            Assignee: Rens Groothuijsen
>            Priority: Minor
>             Fix For: 3.4.0
>
>
> We have a setup where we filter records with MirrorMaker 2.0 (see below). This results in the following warning messages as a result of NPE's in MirrorSourceTask.commitRecord for each filtered record:
> {code:java}
> [2022-01-31 08:01:29,581] WARN [MirrorSourceConnector|task-0] Failure committing record. (org.apache.kafka.connect.mirror.MirrorSourceTask:190)
> java.lang.NullPointerException
> 	at org.apache.kafka.connect.mirror.MirrorSourceTask.commitRecord(MirrorSourceTask.java:177)
> 	at org.apache.kafka.connect.runtime.WorkerSourceTask.commitTaskRecord(WorkerSourceTask.java:463)
> 	at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:358)
> 	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
> 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
> 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
> 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> 	at java.base/java.lang.Thread.run(Thread.java:829) {code}
> The reason seems to be that for filtered records metadata is null. Note that in the overridden SourceTask.commitRecord the javadoc clearly states that metadata will be null if the record was filtered.
> In our case we use a custom predicate, but the issue can be reproduced with the following configuration:
> {code:java}
> clusters = source,target
> tasks.max = 1
> source.bootstrap.servers = <cluster1>
> target.bootstrap.servers = <cluster2>
> offset.storage.replication.factor=1
> status.storage.replication.factor=1
> config.storage.replication.factor=1
> source->target.enabled = true
> source->target.topics = topic1
> source->target.transforms=Filter
> source->target.transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
> source->target.transforms.Filter.predicate=HeaderPredicate
> source->target.predicates=HeaderPredicate
> source->target.predicates.HeaderPredicate.type=org.apache.kafka.connect.transforms.predicates.HasHeaderKey
> source->target.predicates.HeaderPredicate.name=someheader
>  {code}
> Each record with the header key 'someheader' will result in the NPE and warning message.
> On a side note, we couldn't find clear documentation on how to configure (SMT) filtering with MirrorMaker 2 or whether this is supported at all, but apart from the NPE's and warning messages this seems to functionally work for us with our custom filter.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)