You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Eric Chaves <er...@uolet.com> on 2019/03/27 19:11:52 UTC

Clarifications on getting flowfiles using FlowFileFilters

Hi folks,

I'd like to write InvokeScriptedProcessor that inspect all files in the
incoming queue without actually processing/transfering them until a given
business condition is met.

So I looked for some examples of how could I get all flowfiles in a queue
using session.get(FlowFileFilter) and I would like to confirm if I'm
getting it right.

1) If I have more than one thread executing my processor calling
session.get(FlowFileFilter) can the same flowfile be present to both
threads?

2) If I retrieve some flowfiles using session.get(FlowFileFilter) can I
add/modify their attributes but keep them in the current queue (ie set
attributes but don't transfer them to any relationship)?

3) Am I correct that session.get(FlowFileFilter) don't requires neither
session.commit nor session.rollback? Those were required only if I modify
the flowfile.

4) When obtain a list of flowfiles using session.get(FlowFileFilter) does
it marks/lock/hides the selected files in anyway?

5) I'm playing with this concept with some groovy script using a
InvokeScriptedProcessor and very often, after the second or third
processor's on trigger execution I'm receiving an error *java.io.IOException:
All Partitions have been blacklisted due to failures when attempting to
update. If the Write-Ahead Log is able to perform a checkpoint, this issue
may resolve itself. Otherwise, manual intervention will be required. *What
could be the source of this error? Could it be related to the use of
session.get(FlowFileFilter)? I'm having a hard time to detect what I'm
doing that raises this error.

Thanks in advance!

Re: Clarifications on getting flowfiles using FlowFileFilters

Posted by Eric Chaves <er...@uolet.com>.
Hi Matt, thanks for your explanations. I was coding with some
misconceptions but after your answer I rewrote my script and now it's
working like a charm.

Thanks!!

Em qui, 28 de mar de 2019 às 14:22, Matt Burgess <ma...@apache.org>
escreveu:

> Eric,
>
> Here are some answers to your questions:
>
> 1) The same flowfile will not be present to both threads.
>
> 2) Any flow files retrieved from a session have to be handled with
> that session. You'd either have to keep a reference to the session and
> the flow file(s) -- see MergeContent and MergeRecord for examples --
> or you can keep a map from FlowFile UUID to an attribute map, then
> update the flow file with the attributes when it is time to transfer
> it. In the meantime you'd either rollback the session or transfer the
> flow file to SELF.
>
> 3) No, any flow files retrieved from a session have to be handled with
> that session, either by transfer/commit or remove.
>
> 4) Not sure what you mean by mark/lock/hide, but each flow file
> retrieved from a session is associated only with that session/thread.
> It's kind of "locked" in the sense that while the session is active,
> no other thread/session knows about it.
>
> 5) Not sure what's going on there, but it seems related to the content
> repository so I assume you're updating the flow file contents at the
> time? Maybe put some logging statements around some of the
> session/flowfile operations to see if you can narrow down what's going
> on there. Also, what content repository implementation are you using?
> I think there's a new one that's not yet the default but works better.
>
> Regards,
> Matt
>
> On Wed, Mar 27, 2019 at 3:12 PM Eric Chaves <er...@uolet.com> wrote:
> >
> > Hi folks,
> >
> > I'd like to write InvokeScriptedProcessor that inspect all files in the
> incoming queue without actually processing/transfering them until a given
> business condition is met.
> >
> > So I looked for some examples of how could I get all flowfiles in a
> queue using session.get(FlowFileFilter) and I would like to confirm if I'm
> getting it right.
> >
> > 1) If I have more than one thread executing my processor calling
> session.get(FlowFileFilter) can the same flowfile be present to both
> threads?
> >
> > 2) If I retrieve some flowfiles using session.get(FlowFileFilter) can I
> add/modify their attributes but keep them in the current queue (ie set
> attributes but don't transfer them to any relationship)?
> >
> > 3) Am I correct that session.get(FlowFileFilter) don't requires neither
> session.commit nor session.rollback? Those were required only if I modify
> the flowfile.
> >
> > 4) When obtain a list of flowfiles using session.get(FlowFileFilter)
> does it marks/lock/hides the selected files in anyway?
> >
> > 5) I'm playing with this concept with some groovy script using a
> InvokeScriptedProcessor and very often, after the second or third
> processor's on trigger execution I'm receiving an error
> java.io.IOException: All Partitions have been blacklisted due to failures
> when attempting to update. If the Write-Ahead Log is able to perform a
> checkpoint, this issue may resolve itself. Otherwise, manual intervention
> will be required. What could be the source of this error? Could it be
> related to the use of session.get(FlowFileFilter)? I'm having a hard time
> to detect what I'm doing that raises this error.
> >
> > Thanks in advance!
> >
> >
> >
>

Re: Clarifications on getting flowfiles using FlowFileFilters

Posted by Matt Burgess <ma...@apache.org>.
Eric,

Here are some answers to your questions:

1) The same flowfile will not be present to both threads.

2) Any flow files retrieved from a session have to be handled with
that session. You'd either have to keep a reference to the session and
the flow file(s) -- see MergeContent and MergeRecord for examples --
or you can keep a map from FlowFile UUID to an attribute map, then
update the flow file with the attributes when it is time to transfer
it. In the meantime you'd either rollback the session or transfer the
flow file to SELF.

3) No, any flow files retrieved from a session have to be handled with
that session, either by transfer/commit or remove.

4) Not sure what you mean by mark/lock/hide, but each flow file
retrieved from a session is associated only with that session/thread.
It's kind of "locked" in the sense that while the session is active,
no other thread/session knows about it.

5) Not sure what's going on there, but it seems related to the content
repository so I assume you're updating the flow file contents at the
time? Maybe put some logging statements around some of the
session/flowfile operations to see if you can narrow down what's going
on there. Also, what content repository implementation are you using?
I think there's a new one that's not yet the default but works better.

Regards,
Matt

On Wed, Mar 27, 2019 at 3:12 PM Eric Chaves <er...@uolet.com> wrote:
>
> Hi folks,
>
> I'd like to write InvokeScriptedProcessor that inspect all files in the incoming queue without actually processing/transfering them until a given business condition is met.
>
> So I looked for some examples of how could I get all flowfiles in a queue using session.get(FlowFileFilter) and I would like to confirm if I'm getting it right.
>
> 1) If I have more than one thread executing my processor calling session.get(FlowFileFilter) can the same flowfile be present to both threads?
>
> 2) If I retrieve some flowfiles using session.get(FlowFileFilter) can I add/modify their attributes but keep them in the current queue (ie set attributes but don't transfer them to any relationship)?
>
> 3) Am I correct that session.get(FlowFileFilter) don't requires neither session.commit nor session.rollback? Those were required only if I modify the flowfile.
>
> 4) When obtain a list of flowfiles using session.get(FlowFileFilter) does it marks/lock/hides the selected files in anyway?
>
> 5) I'm playing with this concept with some groovy script using a InvokeScriptedProcessor and very often, after the second or third processor's on trigger execution I'm receiving an error java.io.IOException: All Partitions have been blacklisted due to failures when attempting to update. If the Write-Ahead Log is able to perform a checkpoint, this issue may resolve itself. Otherwise, manual intervention will be required. What could be the source of this error? Could it be related to the use of session.get(FlowFileFilter)? I'm having a hard time to detect what I'm doing that raises this error.
>
> Thanks in advance!
>
>
>