You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Oleksandr Lobunets <al...@gmail.com> on 2018/03/07 17:34:13 UTC

A combination of ExecuteProcess and ExecuteStream processors?

Hello everyone,

I have a case  of running the 3rd party CLI (linux) with the following behaviour:
 - Should be executed upon a FlowFile with attributes/content containing parameters to CLI
 - Accepts params via flags or environment variables 
 - Writes output to stdout as a stream of JSON objects
 - The output might be huge (millions and millions of objects), which means caching stdout is not an option - each line/object should be sent as a separate FlowFile
 - The errors/log is written to stderr (might be very chatty)

Using ExecuteProcessor is not an option (cannot be trigger by incoming FlowFile), but the way it treats stdout is what is desired.
Using ExecuteStreamCommand is not an option as it buffers the output until the binary exists with a status code 0.

Does anybody know if there’s a hybrid component somewhere out there? ;-)

Thank you in advance!

P.S. I’ve tried to write a wrapping script in Python using ExecuteScript processor, but:
 - it looks rather an overkill (JVM -> Jython -> Python -> System process -> …)
 - scripting for NiFi is not providing a pleasant debugging experience
 - I get weird random errors when moving flow from machine to machine - exact copies of VMs (like the example below).

> Caused by: javax.script.ScriptException: AttributeError: type object 'java.lang.Thread' has no attribute 'State' in <script> at line number 1
>         at org.python.jsr223.PyScriptEngine.scriptException(PyScriptEngine.java:222)
>         at org.python.jsr223.PyScriptEngine.eval(PyScriptEngine.java:59)
>         at org.python.jsr223.PyScriptEngine.eval(PyScriptEngine.java:31)
>         at javax.script.AbstractScriptEngine.eval(AbstractScriptEngine.java:264)
>         at org.apache.nifi.script.impl.JythonScriptEngineConfigurator.eval(JythonScriptEngineConfigurator.java:59)
>         at org.apache.nifi.processors.script.ExecuteScript.onTrigger(ExecuteScript.java:220)


Kind regards,
Alexander

Re: A combination of ExecuteProcess and ExecuteStream processors?

Posted by Oleksandr Lobunets <al...@gmail.com>.
Hi Boris, Matt,

Thank you for the prompt answers and suggestions.

@Matt, right, this capability would be great to have. I will proceed with submission of the improvement request.

@Boris, the blog article is helpful - I’ve run into similar frustration cycles with hanging processor and manual killing of the processes.
I’m going to give Groovy a try now - looks like ProcessBuilder with line-by-line reads from a process is the way to go in my case.

Once we touched this topic, here’s another variation of process execution I had to deal with:
 - a FlowFile triggers execution of a CLI (parametrization via flags or environment)
 - subsequent FlowFiles’ content passed to CLI’s stdin
 - CLI’s producing output (line by line) to stdout (same Batch Duration expected)
 - once the flow of incoming data is over (might be indicated by a special FlowFile) the stdin is closed and that signals the CLI to exist properly

So far, I implemented it using ExecuteStreamCommand with preliminary FlowFiles content aggregation (via MergeContent) to keep small input side and then split of the stdout (as a single FlowFile) into separate ones. Not the optional way to go and took some time to guess the merge size in order not to produce huge output (as it’s buffered). This sounds like a completely different type of a processor for processing a group (in terms of FBP)…

Kind regards,
Oleksandr

> On 7. Mar 2018, at 19:32, Matt Burgess <ma...@apache.org> wrote:
> 
> Alexander,
> 
> It sounds like you'd like to see the Batch Duration capability from
> ExecuteProcess added to ExecuteStreamCommand, please feel free to
> write a Jira case [1] for this improvement.
> 
> In the meantime, I second Boris's thought on using Groovy to launch
> your script, it's much more integrated with the NiFi API (as they are
> both organic to the JVM), plus the Jython error you're running into
> has not been fixed yet [2].
> 
> Regards,
> Matt
> 
> [1] https://issues.apache.org/jira/browse/NIFI
> [2] http://bugs.jython.org/issue2642
> 
> On Wed, Mar 7, 2018 at 12:34 PM, Oleksandr Lobunets
> <al...@gmail.com> wrote:
>> Hello everyone,
>> 
>> I have a case  of running the 3rd party CLI (linux) with the following behaviour:
>> - Should be executed upon a FlowFile with attributes/content containing parameters to CLI
>> - Accepts params via flags or environment variables
>> - Writes output to stdout as a stream of JSON objects
>> - The output might be huge (millions and millions of objects), which means caching stdout is not an option - each line/object should be sent as a separate FlowFile
>> - The errors/log is written to stderr (might be very chatty)
>> 
>> Using ExecuteProcessor is not an option (cannot be trigger by incoming FlowFile), but the way it treats stdout is what is desired.
>> Using ExecuteStreamCommand is not an option as it buffers the output until the binary exists with a status code 0.
>> 
>> Does anybody know if there’s a hybrid component somewhere out there? ;-)
>> 
>> Thank you in advance!
>> 
>> P.S. I’ve tried to write a wrapping script in Python using ExecuteScript processor, but:
>> - it looks rather an overkill (JVM -> Jython -> Python -> System process -> …)
>> - scripting for NiFi is not providing a pleasant debugging experience
>> - I get weird random errors when moving flow from machine to machine - exact copies of VMs (like the example below).
>> 
>>> Caused by: javax.script.ScriptException: AttributeError: type object 'java.lang.Thread' has no attribute 'State' in <script> at line number 1
>>>        at org.python.jsr223.PyScriptEngine.scriptException(PyScriptEngine.java:222)
>>>        at org.python.jsr223.PyScriptEngine.eval(PyScriptEngine.java:59)
>>>        at org.python.jsr223.PyScriptEngine.eval(PyScriptEngine.java:31)
>>>        at javax.script.AbstractScriptEngine.eval(AbstractScriptEngine.java:264)
>>>        at org.apache.nifi.script.impl.JythonScriptEngineConfigurator.eval(JythonScriptEngineConfigurator.java:59)
>>>        at org.apache.nifi.processors.script.ExecuteScript.onTrigger(ExecuteScript.java:220)
>> 
>> 
>> Kind regards,
>> Alexander


Re: A combination of ExecuteProcess and ExecuteStream processors?

Posted by Matt Burgess <ma...@apache.org>.
Alexander,

It sounds like you'd like to see the Batch Duration capability from
ExecuteProcess added to ExecuteStreamCommand, please feel free to
write a Jira case [1] for this improvement.

In the meantime, I second Boris's thought on using Groovy to launch
your script, it's much more integrated with the NiFi API (as they are
both organic to the JVM), plus the Jython error you're running into
has not been fixed yet [2].

Regards,
Matt

[1] https://issues.apache.org/jira/browse/NIFI
[2] http://bugs.jython.org/issue2642

On Wed, Mar 7, 2018 at 12:34 PM, Oleksandr Lobunets
<al...@gmail.com> wrote:
> Hello everyone,
>
> I have a case  of running the 3rd party CLI (linux) with the following behaviour:
>  - Should be executed upon a FlowFile with attributes/content containing parameters to CLI
>  - Accepts params via flags or environment variables
>  - Writes output to stdout as a stream of JSON objects
>  - The output might be huge (millions and millions of objects), which means caching stdout is not an option - each line/object should be sent as a separate FlowFile
>  - The errors/log is written to stderr (might be very chatty)
>
> Using ExecuteProcessor is not an option (cannot be trigger by incoming FlowFile), but the way it treats stdout is what is desired.
> Using ExecuteStreamCommand is not an option as it buffers the output until the binary exists with a status code 0.
>
> Does anybody know if there’s a hybrid component somewhere out there? ;-)
>
> Thank you in advance!
>
> P.S. I’ve tried to write a wrapping script in Python using ExecuteScript processor, but:
>  - it looks rather an overkill (JVM -> Jython -> Python -> System process -> …)
>  - scripting for NiFi is not providing a pleasant debugging experience
>  - I get weird random errors when moving flow from machine to machine - exact copies of VMs (like the example below).
>
>> Caused by: javax.script.ScriptException: AttributeError: type object 'java.lang.Thread' has no attribute 'State' in <script> at line number 1
>>         at org.python.jsr223.PyScriptEngine.scriptException(PyScriptEngine.java:222)
>>         at org.python.jsr223.PyScriptEngine.eval(PyScriptEngine.java:59)
>>         at org.python.jsr223.PyScriptEngine.eval(PyScriptEngine.java:31)
>>         at javax.script.AbstractScriptEngine.eval(AbstractScriptEngine.java:264)
>>         at org.apache.nifi.script.impl.JythonScriptEngineConfigurator.eval(JythonScriptEngineConfigurator.java:59)
>>         at org.apache.nifi.processors.script.ExecuteScript.onTrigger(ExecuteScript.java:220)
>
>
> Kind regards,
> Alexander

Re: A combination of ExecuteProcess and ExecuteStream processors?

Posted by Boris Tyukin <bo...@boristyukin.com>.
Hi Alexander,

I had a similar task, but it my case flowfiles were tiny. It might still
work well, because you can stream stdout / sdterr in real-time. Check my
blog post.

I also wanted to use Jython, but Groovy is a really fun language, that you
can grasp basics in hours. It also does not have performance issues like
with Jython

http://boristyukin.com/how-to-run-sqoop-from-nifi/

Boris

On Wed, Mar 7, 2018 at 12:34 PM, Oleksandr Lobunets <
alexander.lobunets@gmail.com> wrote:

> Hello everyone,
>
> I have a case  of running the 3rd party CLI (linux) with the following
> behaviour:
>  - Should be executed upon a FlowFile with attributes/content containing
> parameters to CLI
>  - Accepts params via flags or environment variables
>  - Writes output to stdout as a stream of JSON objects
>  - The output might be huge (millions and millions of objects), which
> means caching stdout is not an option - each line/object should be sent as
> a separate FlowFile
>  - The errors/log is written to stderr (might be very chatty)
>
> Using ExecuteProcessor is not an option (cannot be trigger by incoming
> FlowFile), but the way it treats stdout is what is desired.
> Using ExecuteStreamCommand is not an option as it buffers the output until
> the binary exists with a status code 0.
>
> Does anybody know if there’s a hybrid component somewhere out there? ;-)
>
> Thank you in advance!
>
> P.S. I’ve tried to write a wrapping script in Python using ExecuteScript
> processor, but:
>  - it looks rather an overkill (JVM -> Jython -> Python -> System process
> -> …)
>  - scripting for NiFi is not providing a pleasant debugging experience
>  - I get weird random errors when moving flow from machine to machine -
> exact copies of VMs (like the example below).
>
> > Caused by: javax.script.ScriptException: AttributeError: type object
> 'java.lang.Thread' has no attribute 'State' in <script> at line number 1
> >         at org.python.jsr223.PyScriptEngine.scriptException(
> PyScriptEngine.java:222)
> >         at org.python.jsr223.PyScriptEngine.eval(PyScriptEngine.java:59)
> >         at org.python.jsr223.PyScriptEngine.eval(PyScriptEngine.java:31)
> >         at javax.script.AbstractScriptEngine.eval(
> AbstractScriptEngine.java:264)
> >         at org.apache.nifi.script.impl.JythonScriptEngineConfigurator
> .eval(JythonScriptEngineConfigurator.java:59)
> >         at org.apache.nifi.processors.script.ExecuteScript.
> onTrigger(ExecuteScript.java:220)
>
>
> Kind regards,
> Alexander