You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by na...@bt.com on 2021/03/01 07:20:31 UTC

RE: Converting Records

Hey Mark,

The Validate Record Processor was what I was looking for!

As we are using the _0_10 version of the produce record processor, we don’t seem to have any of the transactional properties listed? I’m guessing this is on the newer Kafka API processors?

Kind regards,

Nathan

From: Mark Payne [mailto:markap14@hotmail.com]
Sent: 25 February 2021 17:08
To: users@nifi.apache.org
Subject: Re: Converting Records

Nathan,

Have you taken a look at ValidateRecord yet? That should allow you to separate out the record that do not match the schema, before trying to publish to Kafka. Regarding the fact that some messages are already delivered to Kafka when you encounter a failure - this can be avoided if you configure the publisher to use transactions when sending to Kafka. Either all records in a FlowFile will go, or none of them will go. However, once you’ve separated out the invalid records, I suspect this will be less of a concern.

Thanks
-Mark



On Feb 25, 2021, at 7:44 AM, nathan.english@bt.com<ma...@bt.com> wrote:

Hi All,

I’ve had a look at various processors and the documentation and can’t seem to find any information. So I’m hoping someone may have an idea or point me in the right direction.

We consume from various data sources, normalise the data and then produce the records to Kafka in Avro format, using the PublishKafkaRecord processor (specifically 0_10).

However, one data source occasionally sends a message that cannot be converted from JSON to Avro as the field has no default value on the Avro schema.  We understand why this happens, and it wouldn’t be a massive issue if the flow file only contained on message. However, our flow files have up to 1000 individual records. When one record in the array is invalid in the flowfile, the whole flowfile fails, even if it’s already produced some of the Kafka Topic messages in that flow file. This, to us, feels far from ideal as we didn’t realise this was the case till we tried to reprocess the file with the error still in place. We saw a massive increase of records published to Kafka because all records before the invalid record were republished multiple times (looped back queue), whilst we tried to understand the error.

Is there any processors or other solutions we can use that will validate the flowfile content is valid before we produce to Kafka and only reject the individual records that are invalid? For example, I have a flowfile with 100 records, 2 of which and not valid against the schema. 98 of them are successful and continue to be published, and the 2 that are invalid get rejected and go down a failure queue.

We are on NiFi v1.11.4, and are looking to upgrade to V1.12.1 in the coming months

Kind Regards,

Nathan