You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Stéphane Maarek <st...@gmail.com> on 2016/07/10 23:42:47 UTC

Create a PutRethinkDB processor

Hi,

I've been thinking about implementing a RethinkDB processor as I'm needing
one for my project. Right now, if I put my code inside of an ExecuteScript,
I basically connect to the database as many times as I'm inserting
documents, and that's rather inefficient (I believe). The best I can get is
to insert 90 documents a second. Also, it seems that I can't increase the
number of concurrent tasks on this processor.

Here's my test code for reference (python):
import rethinkdb as r
r.connect('<myhost>', 28015).repl()
r.table('tv_shows').insert({ 'name': 'Star Trek TNG'
}).run(durability="soft", noreply=True)
flowFile = session.get()
session.transfer(flowFile, REL_SUCCESS)

I have been thinking of doing some kind of implementation that's similar to
PutMongo. I see there is a @OnScheduled annotation that connects to the
database. Is this piece of code run every time a flowfile arrives, or is it
more "smartly" run? Also, can I, instead of going the long way and building
a NAR, use InvokeScriptedProcessor, alongside the @OnScheduled annotation?

Finally, I seem to be quickly having some PermGen space issues. Is that
expected?

Thanks,
Stephane

Re: Create a PutRethinkDB processor

Posted by Joe Witt <jo...@gmail.com>.
Stephane,

Yes, you'll want to create a new processor and package it within a Nar
with proper licensing and dependency management.  You'd be
establishing a reusable and self-contained bundle that will
automatically work nicely within the UI, have documentation generated
from your annotations, and have the framework deal with many of the
error condition handling for you.  It's a good deal :-)

Thanks
Joe

On Sun, Jul 10, 2016 at 10:12 PM, Stéphane Maarek
<st...@gmail.com> wrote:
> Hi Matt,
>
> Thanks for your detailed response. So how do you recommend I proceed for
> creating a good stable PutRethinkDB processor? Do I really have to entirely
> write it and then package it to a NAR? I can that can bring some batching
> optimizations too...
>
> Thanks,
> Stephane
>
> On Mon, Jul 11, 2016 at 10:46 AM Matt Burgess <ma...@gmail.com> wrote:
>>
>> Stéphane,
>>
>> In 0.7.0 and forward, you will be able to set the number of concurrent
>> tasks for ExecuteScript to whatever you like [1].  For
>> InvokeScriptedProcessor, a current issue is that it only expects (and
>> interacts with) a Processor interface, which includes an "initialize"
>> method but doesn't check for an @OnScheduled annotation. Also, the
>> initialize() method of Processor gets called when
>> InvokeScriptedProcessor is scheduled and notices the script needs to
>> be reloaded, which is when the Script File/Body, Engine, or Module
>> Directory properties are modified (via the UI or REST or whatever). So
>> theoretically the scripted processor's initialize() method is called
>> when scheduled (as if it were an @OnScheduled), but only if something
>> has changed. This could definitely be an improvement Jira where
>> scripted processors can have their own annotated methods (especially
>> @OnStopped since there is no -- even indirect -- call to something to
>> stop the scripted processor). However this would only work for Jython
>> [2], JRuby [3],  Groovy (and any other included JSR-223 language that
>> supports Java annotations). I've written this up as [4].
>>
>>
>> Regards,
>> Matt
>>
>> [1] https://issues.apache.org/jira/browse/NIFI-1822
>> [2] http://www.fiber-space.de/jynx/doc/jannotations.html
>> [3] https://github.com/jruby/jruby/wiki/JRuby-Reference#java_annotation
>> [4] https://issues.apache.org/jira/browse/NIFI-2215
>>
>> On Sun, Jul 10, 2016 at 7:42 PM, Stéphane Maarek
>> <st...@gmail.com> wrote:
>> > Hi,
>> >
>> > I've been thinking about implementing a RethinkDB processor as I'm
>> > needing
>> > one for my project. Right now, if I put my code inside of an
>> > ExecuteScript,
>> > I basically connect to the database as many times as I'm inserting
>> > documents, and that's rather inefficient (I believe). The best I can get
>> > is
>> > to insert 90 documents a second. Also, it seems that I can't increase
>> > the
>> > number of concurrent tasks on this processor.
>> >
>> > Here's my test code for reference (python):
>> > import rethinkdb as r
>> > r.connect('<myhost>', 28015).repl()
>> > r.table('tv_shows').insert({ 'name': 'Star Trek TNG'
>> > }).run(durability="soft", noreply=True)
>> > flowFile = session.get()
>> > session.transfer(flowFile, REL_SUCCESS)
>> >
>> > I have been thinking of doing some kind of implementation that's similar
>> > to
>> > PutMongo. I see there is a @OnScheduled annotation that connects to the
>> > database. Is this piece of code run every time a flowfile arrives, or is
>> > it
>> > more "smartly" run? Also, can I, instead of going the long way and
>> > building
>> > a NAR, use InvokeScriptedProcessor, alongside the @OnScheduled
>> > annotation?
>> >
>> > Finally, I seem to be quickly having some PermGen space issues. Is that
>> > expected?
>> >
>> > Thanks,
>> > Stephane

Re: Create a PutRethinkDB processor

Posted by Stéphane Maarek <st...@gmail.com>.
Hi Matt,

Thanks for your detailed response. So how do you recommend I proceed for
creating a good stable PutRethinkDB processor? Do I really have to entirely
write it and then package it to a NAR? I can that can bring some batching
optimizations too...

Thanks,
Stephane

On Mon, Jul 11, 2016 at 10:46 AM Matt Burgess <ma...@gmail.com> wrote:

> Stéphane,
>
> In 0.7.0 and forward, you will be able to set the number of concurrent
> tasks for ExecuteScript to whatever you like [1].  For
> InvokeScriptedProcessor, a current issue is that it only expects (and
> interacts with) a Processor interface, which includes an "initialize"
> method but doesn't check for an @OnScheduled annotation. Also, the
> initialize() method of Processor gets called when
> InvokeScriptedProcessor is scheduled and notices the script needs to
> be reloaded, which is when the Script File/Body, Engine, or Module
> Directory properties are modified (via the UI or REST or whatever). So
> theoretically the scripted processor's initialize() method is called
> when scheduled (as if it were an @OnScheduled), but only if something
> has changed. This could definitely be an improvement Jira where
> scripted processors can have their own annotated methods (especially
> @OnStopped since there is no -- even indirect -- call to something to
> stop the scripted processor). However this would only work for Jython
> [2], JRuby [3],  Groovy (and any other included JSR-223 language that
> supports Java annotations). I've written this up as [4].
>
>
> Regards,
> Matt
>
> [1] https://issues.apache.org/jira/browse/NIFI-1822
> [2] http://www.fiber-space.de/jynx/doc/jannotations.html
> [3] https://github.com/jruby/jruby/wiki/JRuby-Reference#java_annotation
> [4] https://issues.apache.org/jira/browse/NIFI-2215
>
> On Sun, Jul 10, 2016 at 7:42 PM, Stéphane Maarek
> <st...@gmail.com> wrote:
> > Hi,
> >
> > I've been thinking about implementing a RethinkDB processor as I'm
> needing
> > one for my project. Right now, if I put my code inside of an
> ExecuteScript,
> > I basically connect to the database as many times as I'm inserting
> > documents, and that's rather inefficient (I believe). The best I can get
> is
> > to insert 90 documents a second. Also, it seems that I can't increase the
> > number of concurrent tasks on this processor.
> >
> > Here's my test code for reference (python):
> > import rethinkdb as r
> > r.connect('<myhost>', 28015).repl()
> > r.table('tv_shows').insert({ 'name': 'Star Trek TNG'
> > }).run(durability="soft", noreply=True)
> > flowFile = session.get()
> > session.transfer(flowFile, REL_SUCCESS)
> >
> > I have been thinking of doing some kind of implementation that's similar
> to
> > PutMongo. I see there is a @OnScheduled annotation that connects to the
> > database. Is this piece of code run every time a flowfile arrives, or is
> it
> > more "smartly" run? Also, can I, instead of going the long way and
> building
> > a NAR, use InvokeScriptedProcessor, alongside the @OnScheduled
> annotation?
> >
> > Finally, I seem to be quickly having some PermGen space issues. Is that
> > expected?
> >
> > Thanks,
> > Stephane
>

Re: Create a PutRethinkDB processor

Posted by Matt Burgess <ma...@gmail.com>.
Stéphane,

In 0.7.0 and forward, you will be able to set the number of concurrent
tasks for ExecuteScript to whatever you like [1].  For
InvokeScriptedProcessor, a current issue is that it only expects (and
interacts with) a Processor interface, which includes an "initialize"
method but doesn't check for an @OnScheduled annotation. Also, the
initialize() method of Processor gets called when
InvokeScriptedProcessor is scheduled and notices the script needs to
be reloaded, which is when the Script File/Body, Engine, or Module
Directory properties are modified (via the UI or REST or whatever). So
theoretically the scripted processor's initialize() method is called
when scheduled (as if it were an @OnScheduled), but only if something
has changed. This could definitely be an improvement Jira where
scripted processors can have their own annotated methods (especially
@OnStopped since there is no -- even indirect -- call to something to
stop the scripted processor). However this would only work for Jython
[2], JRuby [3],  Groovy (and any other included JSR-223 language that
supports Java annotations). I've written this up as [4].


Regards,
Matt

[1] https://issues.apache.org/jira/browse/NIFI-1822
[2] http://www.fiber-space.de/jynx/doc/jannotations.html
[3] https://github.com/jruby/jruby/wiki/JRuby-Reference#java_annotation
[4] https://issues.apache.org/jira/browse/NIFI-2215

On Sun, Jul 10, 2016 at 7:42 PM, Stéphane Maarek
<st...@gmail.com> wrote:
> Hi,
>
> I've been thinking about implementing a RethinkDB processor as I'm needing
> one for my project. Right now, if I put my code inside of an ExecuteScript,
> I basically connect to the database as many times as I'm inserting
> documents, and that's rather inefficient (I believe). The best I can get is
> to insert 90 documents a second. Also, it seems that I can't increase the
> number of concurrent tasks on this processor.
>
> Here's my test code for reference (python):
> import rethinkdb as r
> r.connect('<myhost>', 28015).repl()
> r.table('tv_shows').insert({ 'name': 'Star Trek TNG'
> }).run(durability="soft", noreply=True)
> flowFile = session.get()
> session.transfer(flowFile, REL_SUCCESS)
>
> I have been thinking of doing some kind of implementation that's similar to
> PutMongo. I see there is a @OnScheduled annotation that connects to the
> database. Is this piece of code run every time a flowfile arrives, or is it
> more "smartly" run? Also, can I, instead of going the long way and building
> a NAR, use InvokeScriptedProcessor, alongside the @OnScheduled annotation?
>
> Finally, I seem to be quickly having some PermGen space issues. Is that
> expected?
>
> Thanks,
> Stephane