You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Wolfgang Ederer <W....@intence.de> on 2019/04/16 14:03:34 UTC

Using result of HTTP request to filter flowfiles

Hello,

we are currently trying to replace an old microservice with nifi.
The service gets data from a database and then checks each row if the data should be written to another database based on an http call to an API.
The API basically returns a json array with ids.
E.g. the table has rows with ids [1,3,4,5] and the API returns [2,4,5] thus only rows [4,5] will be written to the target database.

I came up with multiple possible approaches which I will describe in a simplified manner below (some processors are omitted).

Approach #1
One possibility to model this with nifi would be to use a InvokeHTTP processor in conjunction with an ExecuteSQL by using the ids inside the WHERE clause.
E.g. SELECT * FROM <sometable> WHERE ids in (<ids_from_invokehttp>)
[cid:image001.png@01D4F46B.3E272DA0]
This however degrades the performance of the query which causes timeouts.

  *   Not feasible

Approach #2
Alternatively one could put the ids in a DistributedMapCache in another flow and check for the ids in a FetchDistributedMapCache.
[cid:image003.png@01D4F46C.74AC9620]
However it is not possible to clear the DistributedMapCache and we need the most current result of the API call to filter the ids.

  *   Thus this approach is not feasible either.

Approach #3
Another approach would be to use a flow similar to the first one, but without the ids inside the WHERE clause of the SQL query.
Instead one could write the results oft he InvokeHTTP processor into an attribute via ExtractText and use this further down the flow for filtering the files.
[cid:image002.png@01D4F46D.F14090A0]
This is feasible and the only possible solution I came up with, but adds quite a lot of overhead.
The list from the API is rather long (~500kb) and after splitting the result of the database this overhead gets increased further as each flowfile will have the list as attribute.

Is there another solution which I am not seeing?
Is there a way to clear the DistributedMapCache to make approach #2 possible?


Thanks in advance!

Best regards
Wolfgang Ederer

AW: Using result of HTTP request to filter flowfiles

Posted by Wolfgang Ederer <W....@intence.de>.
Hi Mark,

thank you very much for your answer.
The LookupRecord did not fit my needs, but I came across the ScanAttribute processor while reading a post about the LookupRecord processor.

By putting the result of the API into a file and using ScanAttribute on that file I can basically "cache" the API response.
This means I can call the API once and use the result to filter multiple flowfiles without overhead.

Thanks again!

Best regards,
Wolfgang

Von: Mark Payne <ma...@hotmail.com>
Gesendet: Dienstag, 16. April 2019 19:28
An: users@nifi.apache.org
Betreff: Re: Using result of HTTP request to filter flowfiles

Wolfgang,

So the general pattern that you're running into here is that you want to ingest data from Source 1
(a database), then filter/route the data based on a lookup in a second system/dataset (the API).
So, for this I would recommend that you look at the LookupRecord processor.

So the flow, I think, would look something like:

QueryDatabaseTable -> LookupRecord -> Target Database Processor (PutDatabaseRecord?)

The LookupRecord processor is configured with a Lookup Controller Service. The RestLookupService
may suffice for your use case. Alternatively, you could implement your own custom Lookup Service
that would take in the data from each record and perform the lookup.

I hope this helps!

-Mark



On Apr 16, 2019, at 10:03 AM, Wolfgang Ederer <W....@intence.de>> wrote:

Hello,

we are currently trying to replace an old microservice with nifi.
The service gets data from a database and then checks each row if the data should be written to another database based on an http call to an API.
The API basically returns a json array with ids.
E.g. the table has rows with ids [1,3,4,5] and the API returns [2,4,5] thus only rows [4,5] will be written to the target database.

I came up with multiple possible approaches which I will describe in a simplified manner below (some processors are omitted).

Approach #1
One possibility to model this with nifi would be to use a InvokeHTTP processor in conjunction with an ExecuteSQL by using the ids inside the WHERE clause.
E.g. SELECT * FROM <sometable> WHERE ids in (<ids_from_invokehttp>)
<image001.png>
This however degrades the performance of the query which causes timeouts.

  *   Not feasible

Approach #2
Alternatively one could put the ids in a DistributedMapCache in another flow and check for the ids in a FetchDistributedMapCache.
<image003.png>
However it is not possible to clear the DistributedMapCache and we need the most current result of the API call to filter the ids.

  *   Thus this approach is not feasible either.

Approach #3
Another approach would be to use a flow similar to the first one, but without the ids inside the WHERE clause of the SQL query.
Instead one could write the results oft he InvokeHTTP processor into an attribute via ExtractText and use this further down the flow for filtering the files.
<image002.png>
This is feasible and the only possible solution I came up with, but adds quite a lot of overhead.
The list from the API is rather long (~500kb) and after splitting the result of the database this overhead gets increased further as each flowfile will have the list as attribute.

Is there another solution which I am not seeing?
Is there a way to clear the DistributedMapCache to make approach #2 possible?


Thanks in advance!

Best regards
Wolfgang Ederer


Re: Using result of HTTP request to filter flowfiles

Posted by Mark Payne <ma...@hotmail.com>.
Wolfgang,

So the general pattern that you're running into here is that you want to ingest data from Source 1
(a database), then filter/route the data based on a lookup in a second system/dataset (the API).
So, for this I would recommend that you look at the LookupRecord processor.

So the flow, I think, would look something like:

QueryDatabaseTable -> LookupRecord -> Target Database Processor (PutDatabaseRecord?)

The LookupRecord processor is configured with a Lookup Controller Service. The RestLookupService
may suffice for your use case. Alternatively, you could implement your own custom Lookup Service
that would take in the data from each record and perform the lookup.

I hope this helps!

-Mark


On Apr 16, 2019, at 10:03 AM, Wolfgang Ederer <W....@intence.de>> wrote:

Hello,

we are currently trying to replace an old microservice with nifi.
The service gets data from a database and then checks each row if the data should be written to another database based on an http call to an API.
The API basically returns a json array with ids.
E.g. the table has rows with ids [1,3,4,5] and the API returns [2,4,5] thus only rows [4,5] will be written to the target database.

I came up with multiple possible approaches which I will describe in a simplified manner below (some processors are omitted).

Approach #1
One possibility to model this with nifi would be to use a InvokeHTTP processor in conjunction with an ExecuteSQL by using the ids inside the WHERE clause.
E.g. SELECT * FROM <sometable> WHERE ids in (<ids_from_invokehttp>)
<image001.png>
This however degrades the performance of the query which causes timeouts.

  *   Not feasible


Approach #2
Alternatively one could put the ids in a DistributedMapCache in another flow and check for the ids in a FetchDistributedMapCache.
<image003.png>
However it is not possible to clear the DistributedMapCache and we need the most current result of the API call to filter the ids.

  *   Thus this approach is not feasible either.


Approach #3
Another approach would be to use a flow similar to the first one, but without the ids inside the WHERE clause of the SQL query.
Instead one could write the results oft he InvokeHTTP processor into an attribute via ExtractText and use this further down the flow for filtering the files.
<image002.png>
This is feasible and the only possible solution I came up with, but adds quite a lot of overhead.
The list from the API is rather long (~500kb) and after splitting the result of the database this overhead gets increased further as each flowfile will have the list as attribute.

Is there another solution which I am not seeing?
Is there a way to clear the DistributedMapCache to make approach #2 possible?


Thanks in advance!

Best regards
Wolfgang Ederer