You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by Vibhath Ileperuma <vi...@gmail.com> on 2022/06/20 14:54:52 UTC

Nifi custom processor to consume two flow files at once.

Hi All,

We are planning to develop a custom Nifi processor which consumes two files
at once.
We have a set of files which contains two types of data; say 'A' type data
and 'B' type data. This processor receives these two type files from two
upstream connections.
Processor needs to get one file from both the connections at once and do
the processing.
As I understand, even though we connect multiple upstream connections as
inputs to a processor, in the code, it treats all the data coming from a
single upstream queue.
Is there a way to specify the number of input connections to the processor
and take one file from each processor? If not, what is the flow file
reading order if multiple input connections are available for a processor.?

Thank You.

Best regards,

Vibhath.

Re: Nifi custom processor to consume two flow files at once.

Posted by Mark Payne <ma...@hotmail.com>.
Vibhath,

That’s correct, all of the data is received as if through a single connection. There’s no notion of named inputs.

Unfortunately, that makes this a pattern that’s a bit more difficult to implement than I’d like.
Generally, the way this is handled would be to add an attribute to the FlowFile in your flow using UpdateAttribute.
Then your processor can use that to make sense of what the FlowFile is.
So you might have a flow like:

SourceA -> UpdateAttribute (add attribute ’type’ with value ’Type1’) —> YourProcessor
SourceB -> UpdateAttribute (add attribute ’type’ with value ’Type2’) —/

Then, in YourProcessor, you can get the FlowFiles using a FlowFileFilter so that you can grab one FlowFile of ’Type1’ and one FlowFile of ’Type2’.

OR, the alternative way, which may be easier, is to have your processor extend BinFiles instead of AbstractProcessor.
If you decide to go this route, you may want to take a look at JoinEnrichment. It uses this approach and does something similar where it needs two inputs, one of type ‘Original’ and one of type ‘Enrichment’.

Thanks
-Mark




On Jun 20, 2022, at 10:54 AM, Vibhath Ileperuma <vi...@gmail.com>> wrote:

Hi All,

We are planning to develop a custom Nifi processor which consumes two files at once.
We have a set of files which contains two types of data; say 'A' type data and 'B' type data. This processor receives these two type files from two upstream connections.
Processor needs to get one file from both the connections at once and do the processing.
As I understand, even though we connect multiple upstream connections as inputs to a processor, in the code, it treats all the data coming from a single upstream queue.
Is there a way to specify the number of input connections to the processor and take one file from each processor? If not, what is the flow file reading order if multiple input connections are available for a processor.?

Thank You.

Best regards,

Vibhath.