You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by timF <ti...@fogarty.org> on 2015/09/12 08:49:34 UTC

custom processor - parse flowFile to many kafka messages

I need to create a custom processor.  

GetFile --> MyProcessor --> PutKafka

The incoming flowFile will be a very large text file. Each row of the file
will need to be parsed, put into its own json object, and then sent to a
kafka topic.  My question is the following: Do I need to write each JSON
object to its own output flowFile.  That is if the input file contains N
rows, and I want N messages to show up in the kafka topic, do I create N
output flowFiles ?



--
View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/custom-processor-parse-flowFile-to-many-kafka-messages-tp2782.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.

Re: custom processor - parse flowFile to many kafka messages

Posted by József Mészáros <jo...@impresstv.com>.
Tim,

I needed a very similar workflow:  I had a bunch of CSV files, containing
web tracking events, and I wanted to convert every line to JSON, and then
push them to Kafka as a separate message. The solution was:

GetFile --> ConvertCSVToAvro --> ConvertAvroToJson --> PutKafka

It does not split your huge file(s) into several fow file per line, instead
it converts your content to Apache Avro format (
https://avro.apache.org/docs/current/).

I had tab separated files, which was not supported by the original
ConvertCSVToAvro implementation, so I created a tiny patch:

   - JIRA issue: https://issues.apache.org/jira/browse/NIFI-944
   - Github PR: https://github.com/apache/nifi/pull/87 (waiting for merge)

The ConvertAvroToJson processor exposed the Avro records as array of JSON
objects into a single line, which was not appropriate for my scenario, so I
added a new boolean property, determining how avro records are exposed:
either as a sequence of single Objects (false), writing every Object to a
new line, or as an array of Objects (true). The details for this
modification:

   - JIRA issue : https://issues.apache.org/jira/browse/NIFI-945
   - Github PR : https://github.com/apache/nifi/pull/88 (waiting for merge)


Besides the Avro based solution I created a direct csv2json converter using
Jackson CSV extension (https://github.com/FasterXML/jackson-dataformat-csv).
It converts csv files  directly to JSON and does not use Avro, as an
intermediate format. This custom processor is not published yet, but if you
think it is helpful, I can create a JIRA issue and a Github PR.

I hope it helps you.

Joe


On Mon, Sep 14, 2015 at 5:23 AM, timF <ti...@fogarty.org> wrote:

> Thanks for all the feedback.  Looking at the source code for SplitText, I
> see
> that it parses the input FlowFile, storing the created output FlowFiles in
> a
> list, and then at the end sends the list all at once with a single call to
> session.transfer().  This could be a problem when there are millions of
> records in the input file.
>
> Is there a technical reason why SplitText creates all the output flow files
> before sending them out?  If I were to write my own split process, or a
> combination of GetFile and SplitText where I read the input file line by
> line, can I create an output flow file, send it out, then create the next
> one, send it out, etc?
>
> Does the next processor in the flow get the flow file as soon as it is sent
> with session.transfer?
>
>
>
> --
> View this message in context:
> http://apache-nifi-developer-list.39713.n7.nabble.com/custom-processor-parse-flowFile-to-many-kafka-messages-tp2782p2803.html
> Sent from the Apache NiFi Developer List mailing list archive at
> Nabble.com.
>

RE: custom processor - parse flowFile to many kafka messages

Posted by Mark Payne <ma...@hotmail.com>.
Tim,

All operations on FlowFiles are performed along session boundaries. Nothing is
passed to the next Processor until the session is committed. This way, the session
could be rolled back and everything is restored as it was.

So for your case, it may make sense to try processing the larger file, if you can,
without splitting it into individual records.

There is another solution, albeit a bit less clean: instead of splitting each
FlowFile into 1-line FlowFiles, you could use SplitText to split each into say 10,000 lines.
Then, you could use another SplitText to split each of those into 1 line each. This way,
you can avoid having millions of FlowFiles buffered up all at once.

Hope this helps!
-Mark

----------------------------------------
> Date: Sun, 13 Sep 2015 20:23:45 -0700
> From: tim@fogarty.org
> To: dev@nifi.apache.org
> Subject: RE: custom processor - parse flowFile to many kafka messages
>
> Thanks for all the feedback. Looking at the source code for SplitText, I see
> that it parses the input FlowFile, storing the created output FlowFiles in a
> list, and then at the end sends the list all at once with a single call to
> session.transfer(). This could be a problem when there are millions of
> records in the input file.
>
> Is there a technical reason why SplitText creates all the output flow files
> before sending them out? If I were to write my own split process, or a
> combination of GetFile and SplitText where I read the input file line by
> line, can I create an output flow file, send it out, then create the next
> one, send it out, etc?
>
> Does the next processor in the flow get the flow file as soon as it is sent
> with session.transfer?
>
>
>
> --
> View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/custom-processor-parse-flowFile-to-many-kafka-messages-tp2782p2803.html
> Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.
 		 	   		  

RE: custom processor - parse flowFile to many kafka messages

Posted by timF <ti...@fogarty.org>.
Thanks for all the feedback.  Looking at the source code for SplitText, I see
that it parses the input FlowFile, storing the created output FlowFiles in a
list, and then at the end sends the list all at once with a single call to
session.transfer().  This could be a problem when there are millions of
records in the input file.

Is there a technical reason why SplitText creates all the output flow files
before sending them out?  If I were to write my own split process, or a
combination of GetFile and SplitText where I read the input file line by
line, can I create an output flow file, send it out, then create the next
one, send it out, etc?

Does the next processor in the flow get the flow file as soon as it is sent
with session.transfer?  



--
View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/custom-processor-parse-flowFile-to-many-kafka-messages-tp2782p2803.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.

RE: custom processor - parse flowFile to many kafka messages

Posted by Mark Payne <ma...@hotmail.com>.
Tim,
There are some changes in 0.3.0 that greatly improve NiFi's performance when dealing withtons of small FlowFiles, like this. That being said, if you are still concerned about the numberof FlowFiles that you are having to process, you can in fact have a single FlowFile that containsmany messages to put to Kafka.
The PutKafka processor exposes a "Message Delimiter" property that you can use to tell it how tosplit up the messages in the FlowFile. So if your messages are new-line delimited, for instance, you canuse a new-line as your delimiter and each line in the FlowFile will be sent to Kafka as a separate message.
Thanks-Mark

> Date: Sat, 12 Sep 2015 07:53:59 -0400
> Subject: Re: custom processor - parse flowFile to many kafka messages
> From: joe.witt@gmail.com
> To: dev@nifi.apache.org
> 
> Tim
> 
> Yep.  What you're looking to do should be pretty straight forward.
> 
> One potential option if all 'rows' from the text file are line
> oriented would be:
> 
> GetFile --> SplitText --> YourCustomProc --> PutKafka.
> 
> SplitText will take the input file, regardless of size, and split it
> into a flowfile per line (or lines depending on config).  This way
> each flowFile entering your custom processor will be one 'row'.  Once
> your custom processor has done its thing, presumably converting the
> input format to your desired JSON output, you can then write these
> resulting flowfiles to Kafka.
> 
> Depending on the sophistication of the conversion you may even be able
> to avoid creating a custom processor altogether and simply use the
> ExtractText and ReplaceText processors in its place.  With ExtractText
> you can parse out values of your rows into FlowFile attributes and
> with ReplaceText you can take flow file attributes and replace the
> actual content using the expression language.  It gives you a good bit
> of power and control but not as much as you can give yourself of
> course in a custom processor.
> 
> If you need help with a template showing this or would like to talk
> through various peak performance considerations just let us know.
> 
> Thanks
> Joe
> 
> On Sat, Sep 12, 2015 at 6:00 AM, Rick Braddy <rb...@softnas.com> wrote:
> > Tim,
> >
> > Based on what you describe, and not being familiar with Kafka or your application, it sounds like breaking each row into a flowfile could make sense, depending upon what you're needing to do downstream.  There is overhead associated with each FlowFile, as well as a provenance consideration for what level of granularity you want for the flows.  If there's a more logical way to group multiple JSON objects together as multiple rows that may be more efficient.
> >
> > For throughput reasons, if you have a huge number of rows converting to separate flowfiles, you may want to consider "batching" flowfile creation within your processor (look at how GetFile does this, for example).  This way, each time your processor's onTrigger method gets called, your processor can quickly process and emit NNN number of JSON objects then relinquish control.
> >
> > You said the incoming text file is "very large" - not sure if that's in MB's, GB's or TB. Keep in mind that it will have to be read entirely into the content repository by GetFile before processing, and then your processor will have to deal with streaming that huge file in line by line, parsing and creating the JSON objects.  Not sure if you can accomplish this using the standard Nifi building blocks and expression language, but might be possible.
> >
> > Hope that helps.
> > Rick
> >
> > -----Original Message-----
> > From: timF [mailto:tim@fogarty.org]
> > Sent: Saturday, September 12, 2015 1:50 AM
> > To: dev@nifi.apache.org
> > Subject: custom processor - parse flowFile to many kafka messages
> >
> > I need to create a custom processor.
> >
> > GetFile --> MyProcessor --> PutKafka
> >
> > The incoming flowFile will be a very large text file. Each row of the file will need to be parsed, put into its own json object, and then sent to a kafka topic.  My question is the following: Do I need to write each JSON object to its own output flowFile.  That is if the input file contains N rows, and I want N messages to show up in the kafka topic, do I create N output flowFiles ?
> >
> >
> >
> > --
> > View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/custom-processor-parse-flowFile-to-many-kafka-messages-tp2782.html
> > Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.
 		 	   		  

Re: custom processor - parse flowFile to many kafka messages

Posted by Joe Witt <jo...@gmail.com>.
Tim

Yep.  What you're looking to do should be pretty straight forward.

One potential option if all 'rows' from the text file are line
oriented would be:

GetFile --> SplitText --> YourCustomProc --> PutKafka.

SplitText will take the input file, regardless of size, and split it
into a flowfile per line (or lines depending on config).  This way
each flowFile entering your custom processor will be one 'row'.  Once
your custom processor has done its thing, presumably converting the
input format to your desired JSON output, you can then write these
resulting flowfiles to Kafka.

Depending on the sophistication of the conversion you may even be able
to avoid creating a custom processor altogether and simply use the
ExtractText and ReplaceText processors in its place.  With ExtractText
you can parse out values of your rows into FlowFile attributes and
with ReplaceText you can take flow file attributes and replace the
actual content using the expression language.  It gives you a good bit
of power and control but not as much as you can give yourself of
course in a custom processor.

If you need help with a template showing this or would like to talk
through various peak performance considerations just let us know.

Thanks
Joe

On Sat, Sep 12, 2015 at 6:00 AM, Rick Braddy <rb...@softnas.com> wrote:
> Tim,
>
> Based on what you describe, and not being familiar with Kafka or your application, it sounds like breaking each row into a flowfile could make sense, depending upon what you're needing to do downstream.  There is overhead associated with each FlowFile, as well as a provenance consideration for what level of granularity you want for the flows.  If there's a more logical way to group multiple JSON objects together as multiple rows that may be more efficient.
>
> For throughput reasons, if you have a huge number of rows converting to separate flowfiles, you may want to consider "batching" flowfile creation within your processor (look at how GetFile does this, for example).  This way, each time your processor's onTrigger method gets called, your processor can quickly process and emit NNN number of JSON objects then relinquish control.
>
> You said the incoming text file is "very large" - not sure if that's in MB's, GB's or TB. Keep in mind that it will have to be read entirely into the content repository by GetFile before processing, and then your processor will have to deal with streaming that huge file in line by line, parsing and creating the JSON objects.  Not sure if you can accomplish this using the standard Nifi building blocks and expression language, but might be possible.
>
> Hope that helps.
> Rick
>
> -----Original Message-----
> From: timF [mailto:tim@fogarty.org]
> Sent: Saturday, September 12, 2015 1:50 AM
> To: dev@nifi.apache.org
> Subject: custom processor - parse flowFile to many kafka messages
>
> I need to create a custom processor.
>
> GetFile --> MyProcessor --> PutKafka
>
> The incoming flowFile will be a very large text file. Each row of the file will need to be parsed, put into its own json object, and then sent to a kafka topic.  My question is the following: Do I need to write each JSON object to its own output flowFile.  That is if the input file contains N rows, and I want N messages to show up in the kafka topic, do I create N output flowFiles ?
>
>
>
> --
> View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/custom-processor-parse-flowFile-to-many-kafka-messages-tp2782.html
> Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.

RE: custom processor - parse flowFile to many kafka messages

Posted by Rick Braddy <rb...@softnas.com>.
Tim,

Based on what you describe, and not being familiar with Kafka or your application, it sounds like breaking each row into a flowfile could make sense, depending upon what you're needing to do downstream.  There is overhead associated with each FlowFile, as well as a provenance consideration for what level of granularity you want for the flows.  If there's a more logical way to group multiple JSON objects together as multiple rows that may be more efficient.

For throughput reasons, if you have a huge number of rows converting to separate flowfiles, you may want to consider "batching" flowfile creation within your processor (look at how GetFile does this, for example).  This way, each time your processor's onTrigger method gets called, your processor can quickly process and emit NNN number of JSON objects then relinquish control.

You said the incoming text file is "very large" - not sure if that's in MB's, GB's or TB. Keep in mind that it will have to be read entirely into the content repository by GetFile before processing, and then your processor will have to deal with streaming that huge file in line by line, parsing and creating the JSON objects.  Not sure if you can accomplish this using the standard Nifi building blocks and expression language, but might be possible.

Hope that helps.
Rick

-----Original Message-----
From: timF [mailto:tim@fogarty.org] 
Sent: Saturday, September 12, 2015 1:50 AM
To: dev@nifi.apache.org
Subject: custom processor - parse flowFile to many kafka messages

I need to create a custom processor.  

GetFile --> MyProcessor --> PutKafka

The incoming flowFile will be a very large text file. Each row of the file will need to be parsed, put into its own json object, and then sent to a kafka topic.  My question is the following: Do I need to write each JSON object to its own output flowFile.  That is if the input file contains N rows, and I want N messages to show up in the kafka topic, do I create N output flowFiles ?



--
View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/custom-processor-parse-flowFile-to-many-kafka-messages-tp2782.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.