You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by James Srinivasan <ja...@gmail.com> on 2018/12/17 17:44:47 UTC

NiFi JSON enrichment

Hi all,

I'm trying to enrich a data stream using NiFi. So far I have the following:

1) Stream of vehicle data in JSON format containing (id, make, model)
2) This vehicle data goes into HBase, using id as the row key and the
json data as the cell value (cf:json)
3) Stream of position data in JSON format, containing (id, lat, lon)
4) I extract the id from each of these items, then use FetchHBaseRow
to populate the hbase.row attribute with the json content
corresponding to that vehicle
5) I want to merge the NiFI attribute (which is actually JSON) into
the rest of the content, so I end up with (id, lat, lon, make, model).
This is where I am stuck - using the Jolt processor, I keep getting
unable to unmarshal json to an object

Caveats

1) I'm on NiFi 1.3
2) Much as I would like to use the new record functionality, I'm
trying to be schema agnostic as much as possible

Is this the right approach? Is there an easy way to add the attribute
value as a valid JSON object? Maybe ReplaceText capturing the trailing
} would work?

Thanks in advance,

James

Re: NiFi JSON enrichment

Posted by James Srinivasan <ja...@gmail.com>.
Good idea - this script is pretty close:

https://community.hortonworks.com/questions/75523/processor-for-replacing-json-values-dynamically-an.html

Thanks
On Mon, 17 Dec 2018 at 18:01, Andrew Grande <ap...@gmail.com> wrote:
>
> James,
>
> The easiest would be to merge json in a custom processor. Not easy as in no work at all, but given your limitations with the NiFi version could be done sooner maybe.
>
> Andrew
>
> On Mon, Dec 17, 2018, 9:53 AM James Srinivasan <ja...@gmail.com> wrote:
>>
>> Hi all,
>>
>> I'm trying to enrich a data stream using NiFi. So far I have the following:
>>
>> 1) Stream of vehicle data in JSON format containing (id, make, model)
>> 2) This vehicle data goes into HBase, using id as the row key and the
>> json data as the cell value (cf:json)
>> 3) Stream of position data in JSON format, containing (id, lat, lon)
>> 4) I extract the id from each of these items, then use FetchHBaseRow
>> to populate the hbase.row attribute with the json content
>> corresponding to that vehicle
>> 5) I want to merge the NiFI attribute (which is actually JSON) into
>> the rest of the content, so I end up with (id, lat, lon, make, model).
>> This is where I am stuck - using the Jolt processor, I keep getting
>> unable to unmarshal json to an object
>>
>> Caveats
>>
>> 1) I'm on NiFi 1.3
>> 2) Much as I would like to use the new record functionality, I'm
>> trying to be schema agnostic as much as possible
>>
>> Is this the right approach? Is there an easy way to add the attribute
>> value as a valid JSON object? Maybe ReplaceText capturing the trailing
>> } would work?
>>
>> Thanks in advance,
>>
>> James

Re: NiFi JSON enrichment

Posted by James Srinivasan <ja...@gmail.com>.
Hi Austin,

We did consider enriching records in the GeoMesa converter, but we
also need the enriched records for other destinations e.g.
ElasticSearch, hence we were keen to keep it in NiFi. As suggested, I
think I'll write a little Groovy script to merge JSON (looked up from
HBase) from NiFi attributes into the JSON flow file content.

Thanks,

James

On Thu, 27 Dec 2018 at 22:01, Austin Heyne <ah...@ccri.com> wrote:
>
> James,
>
> A little late to the show but hopefully this is useful.
>
> What we typically do for data enrichment is we'll use an EvaluateJsonPath processor to pull JSON fields out into attributes under a common key, e.g. foo.model. We then have a PutRedis processor that grabs everything under foo and adds them to a record keyed under an identifier for that feature. We then use a redis enrichment cache in the converter [1] to fill in out missing fields.
>
> For your use case I'd probably stream the vehicle data into a redis cache and then have the position updates hit that cache when they're actually being converted.
>
> -Austin
>
> [1] https://www.geomesa.org/documentation/user/convert/cache.html#
>
>
> On 12/18/2018 07:54 PM, Mike Thomsen wrote:
>
> James,
>
> Only skimmed this, but this looks like it might provide some interesting ideas on how to transition from Protobuf to Avro:
>
> https://gist.github.com/alexvictoor/1d3937f502c60318071f
>
> Mike
>
> On Tue, Dec 18, 2018 at 3:07 PM Otto Fowler <ot...@gmail.com> wrote:
>>
>> What would be really cool would be if you could also load the registry with your .protos somehow, and configure using the proto names, and then just have your registry convert them on demand
>>
>>
>> On December 18, 2018 at 15:04:30, Otto Fowler (ottobackwards@gmail.com) wrote:
>>
>> You could implement a custom schema registry that converts the protos to schema on the fly and caches.
>>
>>
>> On December 18, 2018 at 13:55:47, James Srinivasan (james.srinivasan@gmail.com) wrote:
>>
>> Yup, my example used made-up fields to keep it simple. In reality I
>> have between 20 and 80 fields per schema, with some nesting, arrays
>> etc.
>>
>> It might be useful if I explained what I'm currently doing and why I'm
>> not using the record approach:
>>
>> I've got c.20 different data streams in protobuf [1] format, which I
>> need to put into GeoMesa [2] and ElasticSearch. The best format for
>> the latter two is JSON.
>> I've written my own processor [3] to convert from protobuf to the
>> canonical protobuf encoding as JSON. Unlike Avro, protobuf data does
>> not contain the schema, so this processor requires the .proto schema.
>> Finally, I've written GeoMesa converters from JSON into what is
>> required by GeoMesa (ElasticSearch just works). These converters are
>> actually automatically generated from the .proto schema.
>>
>> So far, so good. For enrichment, I can pull out various JSON elements,
>> look them up in HBase and (thanks to Andrew/Matt) merge the results
>> back into the outgoing JSON.
>>
>> The record approach would allow me to do all the above, but in a more
>> strongly typed way, and would probably be more performant. However,
>> I'd have to write and maintain a NiFi Record Schema (Avro schema?) for
>> each of the .proto schemas (assuming that is possible) which seemed an
>> overhead for little potential gain.
>>
>> I could instead convert protobuf to Avro at the start, but that seemed
>> non-trivial. I guess the main underlying question is how NiFi Record
>> Schemas are meant to work when the source data already has its own
>> schema definition language (right now this is only Avro?)
>>
>> Hope this makes some sense, I'm certainly not against using Records
>> given more time & effort.
>>
>> James
>>
>> [1] https://developers.google.com/protocol-buffers/
>> [2] https://geomesa.org
>> [3] Which I hope to contribute back
>> On Mon, 17 Dec 2018 at 18:24, Bryan Bende <bb...@gmail.com> wrote:
>> >
>> > I know you mentioned staying schema agnostic, but if you went with the
>> > record approach then this sounds like a good fit for the HBase lookup
>> > service.
>> >
>> > Steps 3-5 would be using LookupRecord with an HBaseLookupService where
>> > you lookup by row id, and put the results into the current record.
>> >
>> > I'm not sure if your example used made up fields, but if not, then
>> > you'd just need a schema that had the 5 fields defined.
>> > On Mon, Dec 17, 2018 at 1:01 PM Andrew Grande <ap...@gmail.com> wrote:
>> > >
>> > > James,
>> > >
>> > > The easiest would be to merge json in a custom processor. Not easy as in no work at all, but given your limitations with the NiFi version could be done sooner maybe.
>> > >
>> > > Andrew
>> > >
>> > > On Mon, Dec 17, 2018, 9:53 AM James Srinivasan <ja...@gmail.com> wrote:
>> > >>
>> > >> Hi all,
>> > >>
>> > >> I'm trying to enrich a data stream using NiFi. So far I have the following:
>> > >>
>> > >> 1) Stream of vehicle data in JSON format containing (id, make, model)
>> > >> 2) This vehicle data goes into HBase, using id as the row key and the
>> > >> json data as the cell value (cf:json)
>> > >> 3) Stream of position data in JSON format, containing (id, lat, lon)
>> > >> 4) I extract the id from each of these items, then use FetchHBaseRow
>> > >> to populate the hbase.row attribute with the json content
>> > >> corresponding to that vehicle
>> > >> 5) I want to merge the NiFI attribute (which is actually JSON) into
>> > >> the rest of the content, so I end up with (id, lat, lon, make, model).
>> > >> This is where I am stuck - using the Jolt processor, I keep getting
>> > >> unable to unmarshal json to an object
>> > >>
>> > >> Caveats
>> > >>
>> > >> 1) I'm on NiFi 1.3
>> > >> 2) Much as I would like to use the new record functionality, I'm
>> > >> trying to be schema agnostic as much as possible
>> > >>
>> > >> Is this the right approach? Is there an easy way to add the attribute
>> > >> value as a valid JSON object? Maybe ReplaceText capturing the trailing
>> > >> } would work?
>> > >>
>> > >> Thanks in advance,
>> > >>
>> > >> James
>
>
> --
> Austin L. Heyne

Re: NiFi JSON enrichment

Posted by Austin Heyne <ah...@ccri.com>.
James,

A little late to the show but hopefully this is useful.

What we typically do for data enrichment is we'll use an 
EvaluateJsonPath processor to pull JSON fields out into attributes under 
a common key, e.g. foo.model. We then have a PutRedis processor that 
grabs everything under foo and adds them to a record keyed under an 
identifier for that feature. We then use a redis enrichment cache in the 
converter [1] to fill in out missing fields.

For your use case I'd probably stream the vehicle data into a redis 
cache and then have the position updates hit that cache when they're 
actually being converted.

-Austin

[1] https://www.geomesa.org/documentation/user/convert/cache.html#


On 12/18/2018 07:54 PM, Mike Thomsen wrote:
> James,
>
> Only skimmed this, but this looks like it might provide some 
> interesting ideas on how to transition from Protobuf to Avro:
>
> https://gist.github.com/alexvictoor/1d3937f502c60318071f
>
> Mike
>
> On Tue, Dec 18, 2018 at 3:07 PM Otto Fowler <ottobackwards@gmail.com 
> <ma...@gmail.com>> wrote:
>
>     What would be really cool would be if you could also load the
>     registry with your .protos somehow, and configure using the proto
>     names, and then just have your registry convert them on demand
>
>
>     On December 18, 2018 at 15:04:30, Otto Fowler
>     (ottobackwards@gmail.com <ma...@gmail.com>) wrote:
>
>>     You could implement a custom schema registry that converts the
>>     protos to schema on the fly and caches.
>>
>>
>>     On December 18, 2018 at 13:55:47, James Srinivasan
>>     (james.srinivasan@gmail.com <ma...@gmail.com>)
>>     wrote:
>>
>>>     Yup, my example used made-up fields to keep it simple. In reality I
>>>     have between 20 and 80 fields per schema, with some nesting, arrays
>>>     etc.
>>>
>>>     It might be useful if I explained what I'm currently doing and
>>>     why I'm
>>>     not using the record approach:
>>>
>>>     I've got c.20 different data streams in protobuf [1] format, which I
>>>     need to put into GeoMesa [2] and ElasticSearch. The best format for
>>>     the latter two is JSON.
>>>     I've written my own processor [3] to convert from protobuf to the
>>>     canonical protobuf encoding as JSON. Unlike Avro, protobuf data does
>>>     not contain the schema, so this processor requires the .proto
>>>     schema.
>>>     Finally, I've written GeoMesa converters from JSON into what is
>>>     required by GeoMesa (ElasticSearch just works). These converters are
>>>     actually automatically generated from the .proto schema.
>>>
>>>     So far, so good. For enrichment, I can pull out various JSON
>>>     elements,
>>>     look them up in HBase and (thanks to Andrew/Matt) merge the results
>>>     back into the outgoing JSON.
>>>
>>>     The record approach would allow me to do all the above, but in a
>>>     more
>>>     strongly typed way, and would probably be more performant. However,
>>>     I'd have to write and maintain a NiFi Record Schema (Avro
>>>     schema?) for
>>>     each of the .proto schemas (assuming that is possible) which
>>>     seemed an
>>>     overhead for little potential gain.
>>>
>>>     I could instead convert protobuf to Avro at the start, but that
>>>     seemed
>>>     non-trivial. I guess the main underlying question is how NiFi Record
>>>     Schemas are meant to work when the source data already has its own
>>>     schema definition language (right now this is only Avro?)
>>>
>>>     Hope this makes some sense, I'm certainly not against using Records
>>>     given more time & effort.
>>>
>>>     James
>>>
>>>     [1] https://developers.google.com/protocol-buffers/
>>>     [2] https://geomesa.org
>>>     [3] Which I hope to contribute back
>>>     On Mon, 17 Dec 2018 at 18:24, Bryan Bende <bbende@gmail.com
>>>     <ma...@gmail.com>> wrote:
>>>     >
>>>     > I know you mentioned staying schema agnostic, but if you went
>>>     with the
>>>     > record approach then this sounds like a good fit for the HBase
>>>     lookup
>>>     > service.
>>>     >
>>>     > Steps 3-5 would be using LookupRecord with an
>>>     HBaseLookupService where
>>>     > you lookup by row id, and put the results into the current record.
>>>     >
>>>     > I'm not sure if your example used made up fields, but if not, then
>>>     > you'd just need a schema that had the 5 fields defined.
>>>     > On Mon, Dec 17, 2018 at 1:01 PM Andrew Grande
>>>     <aperepel@gmail.com <ma...@gmail.com>> wrote:
>>>     > >
>>>     > > James,
>>>     > >
>>>     > > The easiest would be to merge json in a custom processor.
>>>     Not easy as in no work at all, but given your limitations with
>>>     the NiFi version could be done sooner maybe.
>>>     > >
>>>     > > Andrew
>>>     > >
>>>     > > On Mon, Dec 17, 2018, 9:53 AM James Srinivasan
>>>     <james.srinivasan@gmail.com <ma...@gmail.com>>
>>>     wrote:
>>>     > >>
>>>     > >> Hi all,
>>>     > >>
>>>     > >> I'm trying to enrich a data stream using NiFi. So far I
>>>     have the following:
>>>     > >>
>>>     > >> 1) Stream of vehicle data in JSON format containing (id,
>>>     make, model)
>>>     > >> 2) This vehicle data goes into HBase, using id as the row
>>>     key and the
>>>     > >> json data as the cell value (cf:json)
>>>     > >> 3) Stream of position data in JSON format, containing (id,
>>>     lat, lon)
>>>     > >> 4) I extract the id from each of these items, then use
>>>     FetchHBaseRow
>>>     > >> to populate the hbase.row attribute with the json content
>>>     > >> corresponding to that vehicle
>>>     > >> 5) I want to merge the NiFI attribute (which is actually
>>>     JSON) into
>>>     > >> the rest of the content, so I end up with (id, lat, lon,
>>>     make, model).
>>>     > >> This is where I am stuck - using the Jolt processor, I keep
>>>     getting
>>>     > >> unable to unmarshal json to an object
>>>     > >>
>>>     > >> Caveats
>>>     > >>
>>>     > >> 1) I'm on NiFi 1.3
>>>     > >> 2) Much as I would like to use the new record
>>>     functionality, I'm
>>>     > >> trying to be schema agnostic as much as possible
>>>     > >>
>>>     > >> Is this the right approach? Is there an easy way to add the
>>>     attribute
>>>     > >> value as a valid JSON object? Maybe ReplaceText capturing
>>>     the trailing
>>>     > >> } would work?
>>>     > >>
>>>     > >> Thanks in advance,
>>>     > >>
>>>     > >> James
>

-- 
Austin L. Heyne


Re: NiFi JSON enrichment

Posted by Mike Thomsen <mi...@gmail.com>.
James,

Only skimmed this, but this looks like it might provide some interesting
ideas on how to transition from Protobuf to Avro:

https://gist.github.com/alexvictoor/1d3937f502c60318071f

Mike

On Tue, Dec 18, 2018 at 3:07 PM Otto Fowler <ot...@gmail.com> wrote:

> What would be really cool would be if you could also load the registry
> with your .protos somehow, and configure using the proto names, and then
> just have your registry convert them on demand
>
>
> On December 18, 2018 at 15:04:30, Otto Fowler (ottobackwards@gmail.com)
> wrote:
>
> You could implement a custom schema registry that converts the protos to
> schema on the fly and caches.
>
>
> On December 18, 2018 at 13:55:47, James Srinivasan (
> james.srinivasan@gmail.com) wrote:
>
> Yup, my example used made-up fields to keep it simple. In reality I
> have between 20 and 80 fields per schema, with some nesting, arrays
> etc.
>
> It might be useful if I explained what I'm currently doing and why I'm
> not using the record approach:
>
> I've got c.20 different data streams in protobuf [1] format, which I
> need to put into GeoMesa [2] and ElasticSearch. The best format for
> the latter two is JSON.
> I've written my own processor [3] to convert from protobuf to the
> canonical protobuf encoding as JSON. Unlike Avro, protobuf data does
> not contain the schema, so this processor requires the .proto schema.
> Finally, I've written GeoMesa converters from JSON into what is
> required by GeoMesa (ElasticSearch just works). These converters are
> actually automatically generated from the .proto schema.
>
> So far, so good. For enrichment, I can pull out various JSON elements,
> look them up in HBase and (thanks to Andrew/Matt) merge the results
> back into the outgoing JSON.
>
> The record approach would allow me to do all the above, but in a more
> strongly typed way, and would probably be more performant. However,
> I'd have to write and maintain a NiFi Record Schema (Avro schema?) for
> each of the .proto schemas (assuming that is possible) which seemed an
> overhead for little potential gain.
>
> I could instead convert protobuf to Avro at the start, but that seemed
> non-trivial. I guess the main underlying question is how NiFi Record
> Schemas are meant to work when the source data already has its own
> schema definition language (right now this is only Avro?)
>
> Hope this makes some sense, I'm certainly not against using Records
> given more time & effort.
>
> James
>
> [1] https://developers.google.com/protocol-buffers/
> [2] https://geomesa.org
> [3] Which I hope to contribute back
> On Mon, 17 Dec 2018 at 18:24, Bryan Bende <bb...@gmail.com> wrote:
> >
> > I know you mentioned staying schema agnostic, but if you went with the
> > record approach then this sounds like a good fit for the HBase lookup
> > service.
> >
> > Steps 3-5 would be using LookupRecord with an HBaseLookupService where
> > you lookup by row id, and put the results into the current record.
> >
> > I'm not sure if your example used made up fields, but if not, then
> > you'd just need a schema that had the 5 fields defined.
> > On Mon, Dec 17, 2018 at 1:01 PM Andrew Grande <ap...@gmail.com>
> wrote:
> > >
> > > James,
> > >
> > > The easiest would be to merge json in a custom processor. Not easy as
> in no work at all, but given your limitations with the NiFi version could
> be done sooner maybe.
> > >
> > > Andrew
> > >
> > > On Mon, Dec 17, 2018, 9:53 AM James Srinivasan <
> james.srinivasan@gmail.com> wrote:
> > >>
> > >> Hi all,
> > >>
> > >> I'm trying to enrich a data stream using NiFi. So far I have the
> following:
> > >>
> > >> 1) Stream of vehicle data in JSON format containing (id, make, model)
> > >> 2) This vehicle data goes into HBase, using id as the row key and the
> > >> json data as the cell value (cf:json)
> > >> 3) Stream of position data in JSON format, containing (id, lat, lon)
> > >> 4) I extract the id from each of these items, then use FetchHBaseRow
> > >> to populate the hbase.row attribute with the json content
> > >> corresponding to that vehicle
> > >> 5) I want to merge the NiFI attribute (which is actually JSON) into
> > >> the rest of the content, so I end up with (id, lat, lon, make, model).
> > >> This is where I am stuck - using the Jolt processor, I keep getting
> > >> unable to unmarshal json to an object
> > >>
> > >> Caveats
> > >>
> > >> 1) I'm on NiFi 1.3
> > >> 2) Much as I would like to use the new record functionality, I'm
> > >> trying to be schema agnostic as much as possible
> > >>
> > >> Is this the right approach? Is there an easy way to add the attribute
> > >> value as a valid JSON object? Maybe ReplaceText capturing the trailing
> > >> } would work?
> > >>
> > >> Thanks in advance,
> > >>
> > >> James
>
>

Re: NiFi JSON enrichment

Posted by Otto Fowler <ot...@gmail.com>.
What would be really cool would be if you could also load the registry with
your .protos somehow, and configure using the proto names, and then just
have your registry convert them on demand


On December 18, 2018 at 15:04:30, Otto Fowler (ottobackwards@gmail.com)
wrote:

You could implement a custom schema registry that converts the protos to
schema on the fly and caches.


On December 18, 2018 at 13:55:47, James Srinivasan (
james.srinivasan@gmail.com) wrote:

Yup, my example used made-up fields to keep it simple. In reality I
have between 20 and 80 fields per schema, with some nesting, arrays
etc.

It might be useful if I explained what I'm currently doing and why I'm
not using the record approach:

I've got c.20 different data streams in protobuf [1] format, which I
need to put into GeoMesa [2] and ElasticSearch. The best format for
the latter two is JSON.
I've written my own processor [3] to convert from protobuf to the
canonical protobuf encoding as JSON. Unlike Avro, protobuf data does
not contain the schema, so this processor requires the .proto schema.
Finally, I've written GeoMesa converters from JSON into what is
required by GeoMesa (ElasticSearch just works). These converters are
actually automatically generated from the .proto schema.

So far, so good. For enrichment, I can pull out various JSON elements,
look them up in HBase and (thanks to Andrew/Matt) merge the results
back into the outgoing JSON.

The record approach would allow me to do all the above, but in a more
strongly typed way, and would probably be more performant. However,
I'd have to write and maintain a NiFi Record Schema (Avro schema?) for
each of the .proto schemas (assuming that is possible) which seemed an
overhead for little potential gain.

I could instead convert protobuf to Avro at the start, but that seemed
non-trivial. I guess the main underlying question is how NiFi Record
Schemas are meant to work when the source data already has its own
schema definition language (right now this is only Avro?)

Hope this makes some sense, I'm certainly not against using Records
given more time & effort.

James

[1] https://developers.google.com/protocol-buffers/
[2] https://geomesa.org
[3] Which I hope to contribute back
On Mon, 17 Dec 2018 at 18:24, Bryan Bende <bb...@gmail.com> wrote:
>
> I know you mentioned staying schema agnostic, but if you went with the
> record approach then this sounds like a good fit for the HBase lookup
> service.
>
> Steps 3-5 would be using LookupRecord with an HBaseLookupService where
> you lookup by row id, and put the results into the current record.
>
> I'm not sure if your example used made up fields, but if not, then
> you'd just need a schema that had the 5 fields defined.
> On Mon, Dec 17, 2018 at 1:01 PM Andrew Grande <ap...@gmail.com> wrote:
> >
> > James,
> >
> > The easiest would be to merge json in a custom processor. Not easy as
in no work at all, but given your limitations with the NiFi version could
be done sooner maybe.
> >
> > Andrew
> >
> > On Mon, Dec 17, 2018, 9:53 AM James Srinivasan <
james.srinivasan@gmail.com> wrote:
> >>
> >> Hi all,
> >>
> >> I'm trying to enrich a data stream using NiFi. So far I have the
following:
> >>
> >> 1) Stream of vehicle data in JSON format containing (id, make, model)
> >> 2) This vehicle data goes into HBase, using id as the row key and the
> >> json data as the cell value (cf:json)
> >> 3) Stream of position data in JSON format, containing (id, lat, lon)
> >> 4) I extract the id from each of these items, then use FetchHBaseRow
> >> to populate the hbase.row attribute with the json content
> >> corresponding to that vehicle
> >> 5) I want to merge the NiFI attribute (which is actually JSON) into
> >> the rest of the content, so I end up with (id, lat, lon, make, model).
> >> This is where I am stuck - using the Jolt processor, I keep getting
> >> unable to unmarshal json to an object
> >>
> >> Caveats
> >>
> >> 1) I'm on NiFi 1.3
> >> 2) Much as I would like to use the new record functionality, I'm
> >> trying to be schema agnostic as much as possible
> >>
> >> Is this the right approach? Is there an easy way to add the attribute
> >> value as a valid JSON object? Maybe ReplaceText capturing the trailing
> >> } would work?
> >>
> >> Thanks in advance,
> >>
> >> James

Re: NiFi JSON enrichment

Posted by Otto Fowler <ot...@gmail.com>.
You could implement a custom schema registry that converts the protos to
schema on the fly and caches.


On December 18, 2018 at 13:55:47, James Srinivasan (
james.srinivasan@gmail.com) wrote:

Yup, my example used made-up fields to keep it simple. In reality I
have between 20 and 80 fields per schema, with some nesting, arrays
etc.

It might be useful if I explained what I'm currently doing and why I'm
not using the record approach:

I've got c.20 different data streams in protobuf [1] format, which I
need to put into GeoMesa [2] and ElasticSearch. The best format for
the latter two is JSON.
I've written my own processor [3] to convert from protobuf to the
canonical protobuf encoding as JSON. Unlike Avro, protobuf data does
not contain the schema, so this processor requires the .proto schema.
Finally, I've written GeoMesa converters from JSON into what is
required by GeoMesa (ElasticSearch just works). These converters are
actually automatically generated from the .proto schema.

So far, so good. For enrichment, I can pull out various JSON elements,
look them up in HBase and (thanks to Andrew/Matt) merge the results
back into the outgoing JSON.

The record approach would allow me to do all the above, but in a more
strongly typed way, and would probably be more performant. However,
I'd have to write and maintain a NiFi Record Schema (Avro schema?) for
each of the .proto schemas (assuming that is possible) which seemed an
overhead for little potential gain.

I could instead convert protobuf to Avro at the start, but that seemed
non-trivial. I guess the main underlying question is how NiFi Record
Schemas are meant to work when the source data already has its own
schema definition language (right now this is only Avro?)

Hope this makes some sense, I'm certainly not against using Records
given more time & effort.

James

[1] https://developers.google.com/protocol-buffers/
[2] https://geomesa.org
[3] Which I hope to contribute back
On Mon, 17 Dec 2018 at 18:24, Bryan Bende <bb...@gmail.com> wrote:
>
> I know you mentioned staying schema agnostic, but if you went with the
> record approach then this sounds like a good fit for the HBase lookup
> service.
>
> Steps 3-5 would be using LookupRecord with an HBaseLookupService where
> you lookup by row id, and put the results into the current record.
>
> I'm not sure if your example used made up fields, but if not, then
> you'd just need a schema that had the 5 fields defined.
> On Mon, Dec 17, 2018 at 1:01 PM Andrew Grande <ap...@gmail.com> wrote:
> >
> > James,
> >
> > The easiest would be to merge json in a custom processor. Not easy as
in no work at all, but given your limitations with the NiFi version could
be done sooner maybe.
> >
> > Andrew
> >
> > On Mon, Dec 17, 2018, 9:53 AM James Srinivasan <
james.srinivasan@gmail.com> wrote:
> >>
> >> Hi all,
> >>
> >> I'm trying to enrich a data stream using NiFi. So far I have the
following:
> >>
> >> 1) Stream of vehicle data in JSON format containing (id, make, model)
> >> 2) This vehicle data goes into HBase, using id as the row key and the
> >> json data as the cell value (cf:json)
> >> 3) Stream of position data in JSON format, containing (id, lat, lon)
> >> 4) I extract the id from each of these items, then use FetchHBaseRow
> >> to populate the hbase.row attribute with the json content
> >> corresponding to that vehicle
> >> 5) I want to merge the NiFI attribute (which is actually JSON) into
> >> the rest of the content, so I end up with (id, lat, lon, make, model).
> >> This is where I am stuck - using the Jolt processor, I keep getting
> >> unable to unmarshal json to an object
> >>
> >> Caveats
> >>
> >> 1) I'm on NiFi 1.3
> >> 2) Much as I would like to use the new record functionality, I'm
> >> trying to be schema agnostic as much as possible
> >>
> >> Is this the right approach? Is there an easy way to add the attribute
> >> value as a valid JSON object? Maybe ReplaceText capturing the trailing
> >> } would work?
> >>
> >> Thanks in advance,
> >>
> >> James

Re: NiFi JSON enrichment

Posted by James Srinivasan <ja...@gmail.com>.
Yup, my example used made-up fields to keep it simple. In reality I
have between 20 and 80 fields per schema, with some nesting, arrays
etc.

It might be useful if I explained what I'm currently doing and why I'm
not using the record approach:

I've got c.20 different data streams in protobuf [1] format, which I
need to put into GeoMesa [2] and ElasticSearch. The best format for
the latter two is JSON.
I've written my own processor [3] to convert from protobuf to the
canonical protobuf encoding as JSON. Unlike Avro, protobuf data does
not contain the schema, so this processor requires the .proto schema.
Finally, I've written GeoMesa converters from JSON into what is
required by GeoMesa (ElasticSearch just works). These converters are
actually automatically generated from the .proto schema.

So far, so good. For enrichment, I can pull out various JSON elements,
look them up in HBase and (thanks to Andrew/Matt) merge the results
back into the outgoing JSON.

The record approach would allow me to do all the above, but in a more
strongly typed way, and would probably be more performant. However,
I'd have to write and maintain a NiFi Record Schema (Avro schema?) for
each of the .proto schemas (assuming that is possible) which seemed an
overhead for little potential gain.

I could instead convert protobuf to Avro at the start, but that seemed
non-trivial. I guess the main underlying question is how NiFi Record
Schemas are meant to work when the source data already has its own
schema definition language (right now this is only Avro?)

Hope this makes some sense, I'm certainly not against using Records
given more time & effort.

James

[1] https://developers.google.com/protocol-buffers/
[2] https://geomesa.org
[3] Which I hope to contribute back
On Mon, 17 Dec 2018 at 18:24, Bryan Bende <bb...@gmail.com> wrote:
>
> I know you mentioned staying schema agnostic, but if you went with the
> record approach then this sounds like a good fit for the HBase lookup
> service.
>
> Steps 3-5 would be using LookupRecord with an HBaseLookupService where
> you lookup by row id, and put the results into the current record.
>
> I'm not sure if your example used made up fields, but if not, then
> you'd just need a schema that had the 5 fields defined.
> On Mon, Dec 17, 2018 at 1:01 PM Andrew Grande <ap...@gmail.com> wrote:
> >
> > James,
> >
> > The easiest would be to merge json in a custom processor. Not easy as in no work at all, but given your limitations with the NiFi version could be done sooner maybe.
> >
> > Andrew
> >
> > On Mon, Dec 17, 2018, 9:53 AM James Srinivasan <ja...@gmail.com> wrote:
> >>
> >> Hi all,
> >>
> >> I'm trying to enrich a data stream using NiFi. So far I have the following:
> >>
> >> 1) Stream of vehicle data in JSON format containing (id, make, model)
> >> 2) This vehicle data goes into HBase, using id as the row key and the
> >> json data as the cell value (cf:json)
> >> 3) Stream of position data in JSON format, containing (id, lat, lon)
> >> 4) I extract the id from each of these items, then use FetchHBaseRow
> >> to populate the hbase.row attribute with the json content
> >> corresponding to that vehicle
> >> 5) I want to merge the NiFI attribute (which is actually JSON) into
> >> the rest of the content, so I end up with (id, lat, lon, make, model).
> >> This is where I am stuck - using the Jolt processor, I keep getting
> >> unable to unmarshal json to an object
> >>
> >> Caveats
> >>
> >> 1) I'm on NiFi 1.3
> >> 2) Much as I would like to use the new record functionality, I'm
> >> trying to be schema agnostic as much as possible
> >>
> >> Is this the right approach? Is there an easy way to add the attribute
> >> value as a valid JSON object? Maybe ReplaceText capturing the trailing
> >> } would work?
> >>
> >> Thanks in advance,
> >>
> >> James

Re: NiFi JSON enrichment

Posted by Bryan Bende <bb...@gmail.com>.
I know you mentioned staying schema agnostic, but if you went with the
record approach then this sounds like a good fit for the HBase lookup
service.

Steps 3-5 would be using LookupRecord with an HBaseLookupService where
you lookup by row id, and put the results into the current record.

I'm not sure if your example used made up fields, but if not, then
you'd just need a schema that had the 5 fields defined.
On Mon, Dec 17, 2018 at 1:01 PM Andrew Grande <ap...@gmail.com> wrote:
>
> James,
>
> The easiest would be to merge json in a custom processor. Not easy as in no work at all, but given your limitations with the NiFi version could be done sooner maybe.
>
> Andrew
>
> On Mon, Dec 17, 2018, 9:53 AM James Srinivasan <ja...@gmail.com> wrote:
>>
>> Hi all,
>>
>> I'm trying to enrich a data stream using NiFi. So far I have the following:
>>
>> 1) Stream of vehicle data in JSON format containing (id, make, model)
>> 2) This vehicle data goes into HBase, using id as the row key and the
>> json data as the cell value (cf:json)
>> 3) Stream of position data in JSON format, containing (id, lat, lon)
>> 4) I extract the id from each of these items, then use FetchHBaseRow
>> to populate the hbase.row attribute with the json content
>> corresponding to that vehicle
>> 5) I want to merge the NiFI attribute (which is actually JSON) into
>> the rest of the content, so I end up with (id, lat, lon, make, model).
>> This is where I am stuck - using the Jolt processor, I keep getting
>> unable to unmarshal json to an object
>>
>> Caveats
>>
>> 1) I'm on NiFi 1.3
>> 2) Much as I would like to use the new record functionality, I'm
>> trying to be schema agnostic as much as possible
>>
>> Is this the right approach? Is there an easy way to add the attribute
>> value as a valid JSON object? Maybe ReplaceText capturing the trailing
>> } would work?
>>
>> Thanks in advance,
>>
>> James

Re: NiFi JSON enrichment

Posted by Andrew Grande <ap...@gmail.com>.
James,

The easiest would be to merge json in a custom processor. Not easy as in no
work at all, but given your limitations with the NiFi version could be done
sooner maybe.

Andrew

On Mon, Dec 17, 2018, 9:53 AM James Srinivasan <ja...@gmail.com>
wrote:

> Hi all,
>
> I'm trying to enrich a data stream using NiFi. So far I have the following:
>
> 1) Stream of vehicle data in JSON format containing (id, make, model)
> 2) This vehicle data goes into HBase, using id as the row key and the
> json data as the cell value (cf:json)
> 3) Stream of position data in JSON format, containing (id, lat, lon)
> 4) I extract the id from each of these items, then use FetchHBaseRow
> to populate the hbase.row attribute with the json content
> corresponding to that vehicle
> 5) I want to merge the NiFI attribute (which is actually JSON) into
> the rest of the content, so I end up with (id, lat, lon, make, model).
> This is where I am stuck - using the Jolt processor, I keep getting
> unable to unmarshal json to an object
>
> Caveats
>
> 1) I'm on NiFi 1.3
> 2) Much as I would like to use the new record functionality, I'm
> trying to be schema agnostic as much as possible
>
> Is this the right approach? Is there an easy way to add the attribute
> value as a valid JSON object? Maybe ReplaceText capturing the trailing
> } would work?
>
> Thanks in advance,
>
> James
>