You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Valentina Ivanova <va...@ri.se> on 2020/11/19 15:29:09 UTC

Data enrichment using several keys

Hello everyone!

I am working on the following data enrichment scenario. I am receiving a streaming data from which I am extracting several attributes. Now I want to use three of these attributes as keys to query a csv file and retrieve two values from it. (Something equivalent to this SQL statement)
SELECT value1, value2 FROM csv_file WHERE key1=att_key_1 and key2=att_key_2 and key3=att_key_3
I made this scenario work using QueryRecord, however, I need to read the csv file every time I receive the streaming data - which does not seem really scalable solution.

I have looked for other options and read about LookupRecord[1] and LookupAttribute[2] which are build for such scenarios but unfortunately the respective LookupServices does not seem to allow specifying more than one key. I always get an ERROR saying that the expected key should be one of [my csv file attributes]. (Using one key at a time also results in an error saying that the values for it repeat which is the case as only the combination of the three is unique)

So I am wondering if someone has encountered a similar problem and can provide any advice.  (For now I would prefer not to implement a custom processor but if nothing works any directions and templates that would help will be also very appreciated.)

Many thanks in advance,

Valentina


[1] https://community.cloudera.com/t5/Community-Articles/Data-flow-enrichment-with-NiFi-part-1-LookupRecord-processor/ta-p/246940
[2] https://community.cloudera.com/t5/Community-Articles/Data-flow-enrichment-with-NiFi-part-2-LookupAttribute/ta-p/247072


Re: Data enrichment using several keys

Posted by Valentina Ivanova <va...@ri.se>.
Hi Dirk,

Many thanks for sharing your code and experience! I will probably need to implement something like this.

Thanks again & all the best

Valentina
________________________________
From: Dirk Arends <di...@fontis.com.au>
Sent: Thursday, 19 November 2020 23:21
To: users@nifi.apache.org <us...@nifi.apache.org>
Subject: Re: Data enrichment using several keys

Hi Valentina,

Not sure if this will work for you, but I have achieved 2 key lookups using a LookupService by creating a composite field in the csv. Admittedly I was creating the csv in nifi from another data source so I had control of the CSV's shape.

You may be able to form a composite key in your streamed data without needing a script but I wrote a custom groovy script to combine the two fields in my dataset on the fly to use in the look up.

// Near the start of the `onTrigger`
        def lookupService = context.getProperty(RECORD_LOOKUP).asControllerService(LookupService)

// Located in a function called from both locations of `// TODO process first record`
            final Optional<?> lookupValueOption

            def key = "${record.getValue('ReferenceTypeId')}-${record.getValue('Id')}".toString()

            def lookupCoordinates = ['key':key]

            try {
                lookupValueOption = lookupService.lookup(lookupCoordinates)
            } catch (final Exception e) {
                throw new ProcessException('Failed to lookup coordinates ' + lookupCoordinates + ' in Lookup Service', e)
            }

            if (!lookupValueOption.isPresent()) {
                getLogger().debug('Reference ' + lookupCoordinates + ' not found in Lookup Service. Not adding to reference map')
                continue
            }

            def Record lookupRecord = (Record) lookupValueOption.get()
            def referenceTypeCode = lookupRecord.getValue('ReferenceTypeCode')

Matthew's blog post(s) helped me immensely with this adventure:

http://funnifi.blogspot.com/2019/04/record-processing-with.html<https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Ffunnifi.blogspot.com%2F2019%2F04%2Frecord-processing-with.html&data=04%7C01%7Cvalentina.ivanova%40ri.se%7Ce1c75c819d9d4e87182f08d88cd97c95%7C5a9809cf0bcb413a838a09ecc40cc9e8%7C0%7C1%7C637414214204221990%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=eocU0Gxc3CaC%2FceHQBDJOxYzJGOgtEDijpFaF2qRMNc%3D&reserved=0>

See how you go.

Cheers,
Dirk.

On Fri, 20 Nov 2020 at 02:29, Valentina Ivanova <va...@ri.se>> wrote:
Hello everyone!

I am working on the following data enrichment scenario. I am receiving a streaming data from which I am extracting several attributes. Now I want to use three of these attributes as keys to query a csv file and retrieve two values from it. (Something equivalent to this SQL statement)
SELECT value1, value2 FROM csv_file WHERE key1=att_key_1 and key2=att_key_2 and key3=att_key_3
I made this scenario work using QueryRecord, however, I need to read the csv file every time I receive the streaming data - which does not seem really scalable solution.

I have looked for other options and read about LookupRecord[1] and LookupAttribute[2] which are build for such scenarios but unfortunately the respective LookupServices does not seem to allow specifying more than one key. I always get an ERROR saying that the expected key should be one of [my csv file attributes]. (Using one key at a time also results in an error saying that the values for it repeat which is the case as only the combination of the three is unique)

So I am wondering if someone has encountered a similar problem and can provide any advice.  (For now I would prefer not to implement a custom processor but if nothing works any directions and templates that would help will be also very appreciated.)

Many thanks in advance,

Valentina


[1] https://community.cloudera.com/t5/Community-Articles/Data-flow-enrichment-with-NiFi-part-1-LookupRecord-processor/ta-p/246940<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcommunity.cloudera.com%2Ft5%2FCommunity-Articles%2FData-flow-enrichment-with-NiFi-part-1-LookupRecord-processor%2Fta-p%2F246940&data=04%7C01%7Cvalentina.ivanova%40ri.se%7Ce1c75c819d9d4e87182f08d88cd97c95%7C5a9809cf0bcb413a838a09ecc40cc9e8%7C0%7C1%7C637414214204221990%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=P0G7HLt7n2oG7zjrptWkwDw3uJU7K4nhqui0YGFxl4s%3D&reserved=0>
[2] https://community.cloudera.com/t5/Community-Articles/Data-flow-enrichment-with-NiFi-part-2-LookupAttribute/ta-p/247072<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcommunity.cloudera.com%2Ft5%2FCommunity-Articles%2FData-flow-enrichment-with-NiFi-part-2-LookupAttribute%2Fta-p%2F247072&data=04%7C01%7Cvalentina.ivanova%40ri.se%7Ce1c75c819d9d4e87182f08d88cd97c95%7C5a9809cf0bcb413a838a09ecc40cc9e8%7C0%7C1%7C637414214204231989%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=vYbttfvP2dQfZYTfFQSLjYjLYGIcpgIkU3zp276bYaw%3D&reserved=0>



--
Regards,

--
Dirk Arends

Re: Data enrichment using several keys

Posted by Dirk Arends <di...@fontis.com.au>.
Hi Valentina,

Not sure if this will work for you, but I have achieved 2 key lookups using
a LookupService by creating a composite field in the csv. Admittedly I was
creating the csv in nifi from another data source so I had control of the
CSV's shape.

You may be able to form a composite key in your streamed data without
needing a script but I wrote a custom groovy script to combine the two
fields in my dataset on the fly to use in the look up.

// Near the start of the `onTrigger`
        def lookupService =
context.getProperty(RECORD_LOOKUP).asControllerService(LookupService)

// Located in a function called from both locations of `// TODO process
first record`
            final Optional<?> lookupValueOption

            def key =
"${record.getValue('ReferenceTypeId')}-${record.getValue('Id')}".toString()

            def lookupCoordinates = ['key':key]

            try {
                lookupValueOption = lookupService.lookup(lookupCoordinates)
            } catch (final Exception e) {
                throw new ProcessException('Failed to lookup coordinates '
+ lookupCoordinates + ' in Lookup Service', e)
            }

            if (!lookupValueOption.isPresent()) {
                getLogger().debug('Reference ' + lookupCoordinates + ' not
found in Lookup Service. Not adding to reference map')
                continue
            }

            def Record lookupRecord = (Record) lookupValueOption.get()
            def referenceTypeCode =
lookupRecord.getValue('ReferenceTypeCode')

Matthew's blog post(s) helped me immensely with this adventure:

http://funnifi.blogspot.com/2019/04/record-processing-with.html

See how you go.

Cheers,
Dirk.

On Fri, 20 Nov 2020 at 02:29, Valentina Ivanova <va...@ri.se>
wrote:

> Hello everyone!
>
> I am working on the following data enrichment scenario. I am receiving a
> streaming data from which I am extracting several attributes. Now I want to
> use three of these attributes as keys to query a csv file and retrieve two
> values from it. (Something equivalent to this SQL statement)
> SELECT value1, value2 FROM csv_file WHERE key1=att_key_1 and
> key2=att_key_2 and key3=att_key_3
> I made this scenario work using QueryRecord, however, I need to read the
> csv file every time I receive the streaming data - which does not seem
> really scalable solution.
>
> I have looked for other options and read about LookupRecord[1] and
> LookupAttribute[2] which are build for such scenarios but unfortunately the
> respective LookupServices does not seem to allow specifying more than one
> key. I always get an ERROR saying that the expected key should be one of
> [my csv file attributes]. (Using one key at a time also results in an error
> saying that the values for it repeat which is the case as only the
> combination of the three is unique)
>
> So I am wondering if someone has encountered a similar problem and can
> provide any advice.  (For now I would prefer not to implement a custom
> processor but if nothing works any directions and templates that would help
> will be also very appreciated.)
>
> Many thanks in advance,
>
> Valentina
>
>
> [1]
> https://community.cloudera.com/t5/Community-Articles/Data-flow-enrichment-with-NiFi-part-1-LookupRecord-processor/ta-p/246940
> [2]
> https://community.cloudera.com/t5/Community-Articles/Data-flow-enrichment-with-NiFi-part-2-LookupAttribute/ta-p/247072
>
>

-- 
Regards,

--
Dirk Arends