You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Kelsey RIDER <ke...@ineat-conseil.fr> on 2018/06/21 07:13:45 UTC

NiFi and Mongo

Hello,

I've been experimenting with NiFi and MongoDB. I have a test collection with 1 million documents in it. Each document has the same flat JSON structure with 11 fields.
My NiFi flow exposes a webservice, which allows the user to fetch all the data in CSV format.

However, 1M documents brings NiFi to its knees. Even after increasing the JVM's Xms and Xmx to 2G, I still get an OutOfMemoryError:

2018-06-20 11:27:43,428 WARN [Timer-Driven Process Thread-7] o.a.n.controller.tasks.ConnectableTask Admng.OutOfMemoryError: Java heap space
java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:3332)
        at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
        at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
        at java.lang.StringBuilder.append(StringBuilder.java:136)
        at org.apache.nifi.processors.mongodb.GetMongo.buildBatch(GetMongo.java:222)
        at org.apache.nifi.processors.mongodb.GetMongo.onTrigger(GetMongo.java:341)
        at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
        at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1147)
        at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:175)
        at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenScheduling
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThr
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPool
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

I dug into the code, and discovered that the GetMongo processor takes all the Documents returned from Mongo, converts them to Strings, and concatenates them in a StringBuilder.

My question is thus: is there a better way that I should be doing this?
The only idea I've had is to use a smaller batch size, but that would mean that I'd just need a later processor to concatenate the batches in order to get one big CSV.
Is there some sort of "GetMongoRecord" processor that reads each mongo Document as a record, in the way ExecuteSQL does? (I've done the same test with an SQL database, and it handles 1M records just fine.)

Thanks for your help,

Kelsey
Suite ? l'?volution des dispositifs de r?glementation du travail, si vous recevez ce mail avant 7h00, en soir?e, durant le week-end ou vos cong?s merci, sauf cas d'urgence exceptionnelle, de ne pas le traiter ni d'y r?pondre imm?diatement.

Re: NiFi and Mongo

Posted by Scott Howell <sc...@mobilgov.com>.
Kelsye,

I know it’s not the best suggestion but if Nifi Expression language could be used for the fields you could use an updated attribute processor to add the attributes to the file depending on some other attribute, so that your only adding the attributes that are needed for that particular flow file. Then use those variables to specify the fields on the JSONPathReader. Might not be the best method but I have used it and it works fairly well.

Scott

> On Jun 21, 2018, at 7:00 AM, Kelsey RIDER <ke...@ineat-conseil.fr> wrote:
> 
> OK, thanks for the heads-up!
>  
> If I could make another suggestion: could the JSONPathReader be made a little more dynamic? Currently you have to specify every single field…
>  
> In my case (although I doubt I’m alone), I have several different collections with different schemas. My options are either to have one JSONPathReader with dozens of attributes, or else one Reader per collection type (but then I’d have to somehow dynamically choose which reader to use). It would be easier if there were a way to have a single expression (wildcards? Regex?) that could pick up several properties at once.
>  
>  
> From: Mike Thomsen <mi...@gmail.com> 
> Sent: jeudi 21 juin 2018 13:06
> To: users@nifi.apache.org
> Subject: Re: NiFi and Mongo
>  
> Your general assessment about what you'd need is correct. It's a fairly easy component to build, and I'll throw up a Jira ticket for it. Would definitely be doable for NiFi 1.8.
>  
> Expect the Mongo stuff to go through some real clean up like this in 1.8. One of the other big changes is I will be moving the processors to using a controller service as an optional configuration for the Mongo client with the plan that by probably 1.9 all of the Mongo processors will drop their own client configurations and use the same pool (currently every processor instance maintains its own).
>  
> On Thu, Jun 21, 2018 at 3:13 AM Kelsey RIDER <kelsey.rider@ineat-conseil.fr <ma...@ineat-conseil.fr>> wrote:
> Hello,
>  
> I’ve been experimenting with NiFi and MongoDB. I have a test collection with 1 million documents in it. Each document has the same flat JSON structure with 11 fields.
> My NiFi flow exposes a webservice, which allows the user to fetch all the data in CSV format.
>  
> However, 1M documents brings NiFi to its knees. Even after increasing the JVM’s Xms and Xmx to 2G, I still get an OutOfMemoryError:
>  
> 2018-06-20 11:27:43,428 WARN [Timer-Driven Process Thread-7] o.a.n.controller.tasks.ConnectableTask Admng.OutOfMemoryError: Java heap space
> java.lang.OutOfMemoryError: Java heap space
>         at java.util.Arrays.copyOf(Arrays.java:3332)
>         at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
>         at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
>         at java.lang.StringBuilder.append(StringBuilder.java:136)
>         at org.apache.nifi.processors.mongodb.GetMongo.buildBatch(GetMongo.java:222)
>         at org.apache.nifi.processors.mongodb.GetMongo.onTrigger(GetMongo.java:341)
>         at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>         at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1147)
>         at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:175)
>         at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenScheduling
>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThr
>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPool
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  
> I dug into the code, and discovered that the GetMongo processor takes all the Documents returned from Mongo, converts them to Strings, and concatenates them in a StringBuilder.
>  
> My question is thus: is there a better way that I should be doing this?
> The only idea I’ve had is to use a smaller batch size, but that would mean that I’d just need a later processor to concatenate the batches in order to get one big CSV.
> Is there some sort of “GetMongoRecord” processor that reads each mongo Document as a record, in the way ExecuteSQL does? (I’ve done the same test with an SQL database, and it handles 1M records just fine.)
>  
> Thanks for your help,
>  
> Kelsey
> Suite à l’évolution des dispositifs de réglementation du travail, si vous recevez ce mail avant 7h00, en soirée, durant le week-end ou vos congés merci, sauf cas d’urgence exceptionnelle, de ne pas le traiter ni d’y répondre immédiatement.


Re: NiFi and Mongo

Posted by Joe Witt <jo...@gmail.com>.
Kelsey

Have you looked at JSONTreeReader?

thanks
joe

On Thu, Jun 21, 2018, 5:00 AM Kelsey RIDER <ke...@ineat-conseil.fr>
wrote:

> OK, thanks for the heads-up!
>
>
>
> If I could make another suggestion: could the JSONPathReader be made a
> little more dynamic? Currently you have to specify every single field…
>
>
>
> In my case (although I doubt I’m alone), I have several different
> collections with different schemas. My options are either to have one
> JSONPathReader with dozens of attributes, or else one Reader per collection
> type (but then I’d have to somehow dynamically choose which reader to use).
> It would be easier if there were a way to have a single expression
> (wildcards? Regex?) that could pick up several properties at once.
>
>
>
>
>
> *From:* Mike Thomsen <mi...@gmail.com>
> *Sent:* jeudi 21 juin 2018 13:06
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi and Mongo
>
>
>
> Your general assessment about what you'd need is correct. It's a fairly
> easy component to build, and I'll throw up a Jira ticket for it. Would
> definitely be doable for NiFi 1.8.
>
>
>
> Expect the Mongo stuff to go through some real clean up like this in 1.8.
> One of the other big changes is I will be moving the processors to using a
> controller service as an optional configuration for the Mongo client with
> the plan that by probably 1.9 all of the Mongo processors will drop their
> own client configurations and use the same pool (currently every processor
> instance maintains its own).
>
>
>
> On Thu, Jun 21, 2018 at 3:13 AM Kelsey RIDER <
> kelsey.rider@ineat-conseil.fr> wrote:
>
> Hello,
>
>
>
> I’ve been experimenting with NiFi and MongoDB. I have a test collection
> with 1 million documents in it. Each document has the same flat JSON
> structure with 11 fields.
>
> My NiFi flow exposes a webservice, which allows the user to fetch all the
> data in CSV format.
>
>
>
> However, 1M documents brings NiFi to its knees. Even after increasing the
> JVM’s Xms and Xmx to 2G, I still get an OutOfMemoryError:
>
>
>
> 2018-06-20 11:27:43,428 WARN [Timer-Driven Process Thread-7]
> o.a.n.controller.tasks.ConnectableTask Admng.OutOfMemoryError: Java heap
> space
>
> java.lang.OutOfMemoryError: Java heap space
>
>         at java.util.Arrays.copyOf(Arrays.java:3332)
>
>         at
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
>
>         at
> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
>
>         at java.lang.StringBuilder.append(StringBuilder.java:136)
>
>         at
> org.apache.nifi.processors.mongodb.GetMongo.buildBatch(GetMongo.java:222)
>
>         at
> org.apache.nifi.processors.mongodb.GetMongo.onTrigger(GetMongo.java:341)
>
>         at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>
>         at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1147)
>
>         at
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:175)
>
>         at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenScheduling
>
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThr
>
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPool
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>
>
> I dug into the code, and discovered that the GetMongo processor takes all
> the Documents returned from Mongo, converts them to Strings, and
> concatenates them in a StringBuilder.
>
>
>
> My question is thus: is there a better way that I should be doing this?
>
> The only idea I’ve had is to use a smaller batch size, but that would mean
> that I’d just need a later processor to concatenate the batches in order to
> get one big CSV.
>
> Is there some sort of “GetMongoRecord” processor that reads each mongo
> Document as a record, in the way ExecuteSQL does? (I’ve done the same test
> with an SQL database, and it handles 1M records just fine.)
>
>
>
> Thanks for your help,
>
>
>
> Kelsey
>
> *Suite à l’évolution des dispositifs de réglementation du travail, si vous
> recevez ce mail avant 7h00, en soirée, durant le week-end ou vos congés
> merci, sauf cas d’urgence exceptionnelle, de ne pas le traiter ni d’y
> répondre immédiatement.*
>
>

RE: NiFi and Mongo

Posted by Kelsey RIDER <ke...@ineat-conseil.fr>.
OK, thanks for the heads-up!

If I could make another suggestion: could the JSONPathReader be made a little more dynamic? Currently you have to specify every single field…

In my case (although I doubt I’m alone), I have several different collections with different schemas. My options are either to have one JSONPathReader with dozens of attributes, or else one Reader per collection type (but then I’d have to somehow dynamically choose which reader to use). It would be easier if there were a way to have a single expression (wildcards? Regex?) that could pick up several properties at once.


From: Mike Thomsen <mi...@gmail.com>
Sent: jeudi 21 juin 2018 13:06
To: users@nifi.apache.org
Subject: Re: NiFi and Mongo

Your general assessment about what you'd need is correct. It's a fairly easy component to build, and I'll throw up a Jira ticket for it. Would definitely be doable for NiFi 1.8.

Expect the Mongo stuff to go through some real clean up like this in 1.8. One of the other big changes is I will be moving the processors to using a controller service as an optional configuration for the Mongo client with the plan that by probably 1.9 all of the Mongo processors will drop their own client configurations and use the same pool (currently every processor instance maintains its own).

On Thu, Jun 21, 2018 at 3:13 AM Kelsey RIDER <ke...@ineat-conseil.fr>> wrote:
Hello,

I’ve been experimenting with NiFi and MongoDB. I have a test collection with 1 million documents in it. Each document has the same flat JSON structure with 11 fields.
My NiFi flow exposes a webservice, which allows the user to fetch all the data in CSV format.

However, 1M documents brings NiFi to its knees. Even after increasing the JVM’s Xms and Xmx to 2G, I still get an OutOfMemoryError:

2018-06-20 11:27:43,428 WARN [Timer-Driven Process Thread-7] o.a.n.controller.tasks.ConnectableTask Admng.OutOfMemoryError: Java heap space
java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:3332)
        at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
        at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
        at java.lang.StringBuilder.append(StringBuilder.java:136)
        at org.apache.nifi.processors.mongodb.GetMongo.buildBatch(GetMongo.java:222)
        at org.apache.nifi.processors.mongodb.GetMongo.onTrigger(GetMongo.java:341)
        at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
        at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1147)
        at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:175)
        at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenScheduling
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThr
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPool
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

I dug into the code, and discovered that the GetMongo processor takes all the Documents returned from Mongo, converts them to Strings, and concatenates them in a StringBuilder.

My question is thus: is there a better way that I should be doing this?
The only idea I’ve had is to use a smaller batch size, but that would mean that I’d just need a later processor to concatenate the batches in order to get one big CSV.
Is there some sort of “GetMongoRecord” processor that reads each mongo Document as a record, in the way ExecuteSQL does? (I’ve done the same test with an SQL database, and it handles 1M records just fine.)

Thanks for your help,

Kelsey
Suite à l’évolution des dispositifs de réglementation du travail, si vous recevez ce mail avant 7h00, en soirée, durant le week-end ou vos congés merci, sauf cas d’urgence exceptionnelle, de ne pas le traiter ni d’y répondre immédiatement.

Re: NiFi and Mongo

Posted by Mike Thomsen <mi...@gmail.com>.
Your general assessment about what you'd need is correct. It's a fairly
easy component to build, and I'll throw up a Jira ticket for it. Would
definitely be doable for NiFi 1.8.

Expect the Mongo stuff to go through some real clean up like this in 1.8.
One of the other big changes is I will be moving the processors to using a
controller service as an optional configuration for the Mongo client with
the plan that by probably 1.9 all of the Mongo processors will drop their
own client configurations and use the same pool (currently every processor
instance maintains its own).

On Thu, Jun 21, 2018 at 3:13 AM Kelsey RIDER <ke...@ineat-conseil.fr>
wrote:

> Hello,
>
>
>
> I’ve been experimenting with NiFi and MongoDB. I have a test collection
> with 1 million documents in it. Each document has the same flat JSON
> structure with 11 fields.
>
> My NiFi flow exposes a webservice, which allows the user to fetch all the
> data in CSV format.
>
>
>
> However, 1M documents brings NiFi to its knees. Even after increasing the
> JVM’s Xms and Xmx to 2G, I still get an OutOfMemoryError:
>
>
>
> 2018-06-20 11:27:43,428 WARN [Timer-Driven Process Thread-7]
> o.a.n.controller.tasks.ConnectableTask Admng.OutOfMemoryError: Java heap
> space
>
> java.lang.OutOfMemoryError: Java heap space
>
>         at java.util.Arrays.copyOf(Arrays.java:3332)
>
>         at
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
>
>         at
> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
>
>         at java.lang.StringBuilder.append(StringBuilder.java:136)
>
>         at
> org.apache.nifi.processors.mongodb.GetMongo.buildBatch(GetMongo.java:222)
>
>         at
> org.apache.nifi.processors.mongodb.GetMongo.onTrigger(GetMongo.java:341)
>
>         at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>
>         at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1147)
>
>         at
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:175)
>
>         at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenScheduling
>
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThr
>
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPool
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>
>
> I dug into the code, and discovered that the GetMongo processor takes all
> the Documents returned from Mongo, converts them to Strings, and
> concatenates them in a StringBuilder.
>
>
>
> My question is thus: is there a better way that I should be doing this?
>
> The only idea I’ve had is to use a smaller batch size, but that would mean
> that I’d just need a later processor to concatenate the batches in order to
> get one big CSV.
>
> Is there some sort of “GetMongoRecord” processor that reads each mongo
> Document as a record, in the way ExecuteSQL does? (I’ve done the same test
> with an SQL database, and it handles 1M records just fine.)
>
>
>
> Thanks for your help,
>
>
>
> Kelsey
> Suite à l’évolution des dispositifs de réglementation du travail, si vous
> recevez ce mail avant 7h00, en soirée, durant le week-end ou vos congés
> merci, sauf cas d’urgence exceptionnelle, de ne pas le traiter ni d’y
> répondre immédiatement.
>