You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Stéphane Maarek <st...@gmail.com> on 2016/05/26 04:30:59 UTC

Guidance for NiFi output streaming

Hi,

I need to output some data streaming from multiple devices directly into a
map (mapboxjs).

Basically, every 1 second, I want to only write the last data point for
each device to a json file. My problem resides in "how to pick the latest
data point by device"

My incoming flow file has three attributes: device_id, lat, lon.
at some point they may queue up like this:

1, (-37,20)
1, (-37.1,20.1)
2, (-40,30)
2, (-40.1, 29.9)

At the end, I wish to only have the latest point for each device ID
1, (-37.1,20.1)
2, (-40.1, 29.9)

How can I design a processor for this?

Thanks!
Stephane

Re: Guidance for NiFi output streaming

Posted by Stéphane Maarek <st...@gmail.com>.
Hi,

@Joe: you're right, it would be tedious to store the state, and probably
isn't the best route. The ingestion rate of data is one point per 0.1 sec
per device
@Thad: I really like your suggestion, it makes my life a little bit easier.
I'm going to use MongoDB and perform upserts thanks to the PutMongo
processor that already exists. It really streamlines my flow

Thanks both for the suggestions!

Cheers,
Stephane

On Fri, May 27, 2016 at 12:59 AM Thad Guidry <th...@gmail.com> wrote:

> BTW, the idea previously is a Batching Processor, similar to what Spring
> Batch and other Data Tools provide out of the box.  Not sure if NiFI
> already has that concept in one of the processors , or if you have to
> resort to Groovy or the ExecuteScript processor.
>
> Thad
> +ThadGuidry <https://www.google.com/+ThadGuidry>
>
>

Re: Guidance for NiFi output streaming

Posted by Thad Guidry <th...@gmail.com>.
BTW, the idea previously is a Batching Processor, similar to what Spring
Batch and other Data Tools provide out of the box.  Not sure if NiFI
already has that concept in one of the processors , or if you have to
resort to Groovy or the ExecuteScript processor.

Thad
+ThadGuidry <https://www.google.com/+ThadGuidry>

Re: Guidance for NiFi output streaming

Posted by Thad Guidry <th...@gmail.com>.
Why use a processor to do the filtering work ?  Why filter at all ?  What
if you just kept flowing and updating ?

Why not just store the value into SQL or some database and perform an
Update using the device_id in the where clause ?

Choosing a database that supports JSON natively will let you query and get
JSON output from it, like PostgreSQL, MySQL, MongoDB, SQL Server, etc.

​(Or you could also explore storing the last 100 receives into the DB,
using a sequence generator that gets reset every X, and built with Groovy
and ExecuteScript processor, essentially having a rolling Last 100 that
gets continuously updated)​

Thad
+ThadGuidry <https://www.google.com/+ThadGuidry>

Re: Guidance for NiFi output streaming

Posted by Joe Percivall <jo...@yahoo.com>.
Hello Stephane,

Just to be sure I have your use-case correct, you are ingesting a continuous stream of lat/lon information for various devices. Every 1 second you want to take the information from the previous second and write out just the most recent lat/lon of each device. 

An important question, do you only want this file to include devices that have been in the last second or do you want to write out the last known lat/lon of every device seen? That is an important question because it is the difference between having to store state or not. If you need the last known of all devices seen, and thus need to store state, the use-case gets much trickier.

Another question, what order of magnitude of data are you planning on ingesting? If it's relatively low and you're use-case does not need to store state, you could create a processor that would analyze all FlowFiles currently on the queue to grab the latest lat/lon for each device and then emit a FlowFile with a content of the file you want to write. Set it to trigger every 1 second and it would batch up the latest lat/lon for each device for the previous second. This would start to cause problems when it tries to batch up a large quantity of FlowFiles, similar to MergeContent.
 
Joe

- - - - - - 
Joseph Percivall
linkedin.com/in/Percivall
e: joepercivall@yahoo.com



On Thursday, May 26, 2016 1:06 AM, Stéphane Maarek <st...@gmail.com> wrote:



I have tried a ControlRate but it doesn't work because it seems to stop processing once the threshold of 1 is reached, even though I set a grouping property (I know there are two different values for my group in my queue). Any clue?

On Thu, May 26, 2016 at 2:30 PM Stéphane Maarek <st...@gmail.com> wrote:

Hi,
>
>
>I need to output some data streaming from multiple devices directly into a map (mapboxjs). 
>
>
>Basically, every 1 second, I want to only write the last data point for each device to a json file. My problem resides in "how to pick the latest data point by device"
>
>
>My incoming flow file has three attributes: device_id, lat, lon. 
>at some point they may queue up like this:
>
>
>1, (-37,20)
>1, (-37.1,20.1)
>2, (-40,30)
>2, (-40.1, 29.9)
>
>
>At the end, I wish to only have the latest point for each device ID
>1, (-37.1,20.1)
>2, (-40.1, 29.9)
>
>How can I design a processor for this?
>
>
>Thanks!
>Stephane

Re: Guidance for NiFi output streaming

Posted by Stéphane Maarek <st...@gmail.com>.
I have tried a ControlRate but it doesn't work because it seems to stop
processing once the threshold of 1 is reached, even though I set a grouping
property (I know there are two different values for my group in my queue).
Any clue?

On Thu, May 26, 2016 at 2:30 PM Stéphane Maarek <st...@gmail.com>
wrote:

> Hi,
>
> I need to output some data streaming from multiple devices directly into a
> map (mapboxjs).
>
> Basically, every 1 second, I want to only write the last data point for
> each device to a json file. My problem resides in "how to pick the latest
> data point by device"
>
> My incoming flow file has three attributes: device_id, lat, lon.
> at some point they may queue up like this:
>
> 1, (-37,20)
> 1, (-37.1,20.1)
> 2, (-40,30)
> 2, (-40.1, 29.9)
>
> At the end, I wish to only have the latest point for each device ID
> 1, (-37.1,20.1)
> 2, (-40.1, 29.9)
>
> How can I design a processor for this?
>
> Thanks!
> Stephane
>