You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Bas van Kortenhof <ba...@sanoma.com> on 2017/02/06 09:50:25 UTC

Problem when using backpressure to distribute load over nodes in a cluster

Hi all,

In a clustered NiFi setup I have a flow (see below) which basically consists
of a GetSQS processor that receives notifications of added files in a S3
bucket, a FetchS3Object processor that downloads the files and a custom
processor that parses the data. Because of the size of the files the fetch
and parse processors take minutes to run. 



My goal is to get the nodes in the cluster to each process one file at the
time. However, when I set the file threshold of the two connections to 1 it
can happen that when two files become available and one node is currently
parsing the first file, that it also picks up the second file on that node
because the first connection is empty. However, in this case I want another
node to pick up the file as they have more resources available. This problem
becomes even bigger when other short running processors are added to the
flow (for instance UpdateAttributes processors) as each of the connections
required to fit these connections in the flow can then be filled by a
flowfile, even though other nodes are idle.

I tried setting the threshold of the connections to 0 but this does not seem
to work as NiFi then seems to ignore this value (the processor before such a
connection is not halted). Does anyone know a way to achieve this behaviour?



--
View this message in context: http://apache-nifi-users-list.2361937.n4.nabble.com/Problem-when-using-backpressure-to-distribute-load-over-nodes-in-a-cluster-tp863.html
Sent from the Apache NiFi Users List mailing list archive at Nabble.com.

Re: Problem when using backpressure to distribute load over nodes in a cluster

Posted by Koji Kawamura <ij...@gmail.com>.
Hi Bas,

It worked as expected (at least for me).

In a processor, it's possible to transfer incoming FlowFile back to
itself, so the processor can investigate the FlowFile and free to
decide put it back or transfer it to other relationship.

I've created a JIRA NIFI-3452, and submit a Pull Request for that.
https://issues.apache.org/jira/browse/NIFI-3452
https://github.com/apache/nifi/pull/1490

Would you help reviewing the PR to see if that change works for your use-case?
https://cwiki.apache.org/confluence/display/NIFI/Contributor+Guide#ContributorGuide-CodeReviewProcess

Please let us know if there's anything on reviewing process.

Thanks,
Koji

On Thu, Feb 9, 2017 at 5:24 PM, Bas van Kortenhof
<ba...@sanoma.com> wrote:
> Hi Koji,
>
> That looks like a very neat approach to solving the problem indeed. However,
> I suspect keeping the files in the upstream connection would have some
> challenges as the processor doesn't yet know about the files it has to
> monitor, right?
>
> Anyway, thanks for looking into this. Let me know if you need some extra
> hands for anything related to this, if so I'll see if I can make some time.
>
> Regards,
> Bas
>
>
>
> --
> View this message in context: http://apache-nifi-users-list.2361937.n4.nabble.com/Problem-when-using-backpressure-to-distribute-load-over-nodes-in-a-cluster-tp863p900.html
> Sent from the Apache NiFi Users List mailing list archive at Nabble.com.

Re: Problem when using backpressure to distribute load over nodes in a cluster

Posted by Bas van Kortenhof <ba...@sanoma.com>.
Hi Koji,

That looks like a very neat approach to solving the problem indeed. However,
I suspect keeping the files in the upstream connection would have some
challenges as the processor doesn't yet know about the files it has to
monitor, right?

Anyway, thanks for looking into this. Let me know if you need some extra
hands for anything related to this, if so I'll see if I can make some time.

Regards,
Bas



--
View this message in context: http://apache-nifi-users-list.2361937.n4.nabble.com/Problem-when-using-backpressure-to-distribute-load-over-nodes-in-a-cluster-tp863p900.html
Sent from the Apache NiFi Users List mailing list archive at Nabble.com.

Re: Problem when using backpressure to distribute load over nodes in a cluster

Posted by Koji Kawamura <ij...@gmail.com>.
Hi Bas,

Sorry for the late reply.

Thanks for the clarification, I over simplified the flow. As you
experienced, NiFi back pressure is handled per relationship and as
long as a relationship has room to receive new flow files, source
processor is scheduled to run.

I don't think there's an existing solution to block the source
processor as you desired.

However, I found a possible improvement to achieve that.
NiFi scheduler checks downstream relationship availability, when it's
full, the processor won't be scheduled to run.
In case a source processor has multiple outgoing relationships, and if
ANY of those is full, the processor won't be scheduled.

(This is how processor scheduling works with back-pressure, but can
alter with @TriggerWhenAnyDestinationAvailable annotation.
DistributeLoad is the only processor annotated with this)

So, I think we could use this mechanism to keep the source processor
waiting to be scheduled, by following flow:

GetSQS
  -- success --> FetchS3Object --> Parse --> Notify
  -- success --> Wait

I propose to improve Wait so that user can choose how waiting FlowFile
is handled, from either:
"Route to 'wait' relationship" or "Keep in the Upstream connection".
Currently it has only option to route to 'wait'.

Use "Keep in the Upstream connection" Wait mode with the flow above,
the incoming flow file in GetSQS -> Wait connection stays there until
actual data processing finishes and Notify sends a notification
signal.

I will experiment with this idea and if it works,
I'll submit a JIRA for this and try to add this capability since I've
been working on Wait/Notify processors recently.

Thanks again for sharing your use-case!

Koji


On Tue, Feb 7, 2017 at 6:05 PM, Bas van Kortenhof
<ba...@sanoma.com> wrote:
> Hi Koji,
>
> Thanks for the quick response. I have set the batch size to 1 indeed, and
> the flow you describe works, but my problem is a bit more complex. I'll try
> to show it with an example:
>
>
>
> In this case Node 1 is parsing a flow file (indicated by the X in the
> connection between FetchS3Object and Parse). Both connections have a
> backpressure threshold of 1, but because the object is already fetched, the
> first connection is empty and can thus be filled. This means that, if a new
> item becomes available in the queue, both of the following cases can happen
> with equal probability:
>
>
>
> I'd like to force the second case to happen, because node 2 has more
> resources available.
>
> I hope this explains the situation a bit better. So basically I want the
> backpressure to occur based on a threshold on the whole flow, not an
> individual connection. I haven't found a way to do this up to this point.
>
> Hopefully you have an idea how to achieve this.
>
> Regards,
> Bas
>
>
>
> --
> View this message in context: http://apache-nifi-users-list.2361937.n4.nabble.com/Problem-when-using-backpressure-to-distribute-load-over-nodes-in-a-cluster-tp863p877.html
> Sent from the Apache NiFi Users List mailing list archive at Nabble.com.

Re: Problem when using backpressure to distribute load over nodes in a cluster

Posted by Bas van Kortenhof <ba...@sanoma.com>.
Hi Koji,

Thanks for the quick response. I have set the batch size to 1 indeed, and
the flow you describe works, but my problem is a bit more complex. I'll try
to show it with an example:



In this case Node 1 is parsing a flow file (indicated by the X in the
connection between FetchS3Object and Parse). Both connections have a
backpressure threshold of 1, but because the object is already fetched, the
first connection is empty and can thus be filled. This means that, if a new
item becomes available in the queue, both of the following cases can happen
with equal probability:



I'd like to force the second case to happen, because node 2 has more
resources available.

I hope this explains the situation a bit better. So basically I want the
backpressure to occur based on a threshold on the whole flow, not an
individual connection. I haven't found a way to do this up to this point.

Hopefully you have an idea how to achieve this.

Regards,
Bas



--
View this message in context: http://apache-nifi-users-list.2361937.n4.nabble.com/Problem-when-using-backpressure-to-distribute-load-over-nodes-in-a-cluster-tp863p877.html
Sent from the Apache NiFi Users List mailing list archive at Nabble.com.

Re: Problem when using backpressure to distribute load over nodes in a cluster

Posted by Koji Kawamura <ij...@gmail.com>.
Hi Bas,

I think I was able to create a NiFi flow that works as you expected.
Multiple GetSQS share workload using NiFi back pressure.

Did you set Batch Size of GetSQS to 1?

I've put a flow template and detailed description on this Gist.
https://gist.github.com/ijokarumawak/4a9189ac630cf6cf6cd2d35c19b43fd8

Hope this helps, thanks!
Koji

On Mon, Feb 6, 2017 at 6:50 PM, Bas van Kortenhof
<ba...@sanoma.com> wrote:
> Hi all,
>
> In a clustered NiFi setup I have a flow (see below) which basically consists
> of a GetSQS processor that receives notifications of added files in a S3
> bucket, a FetchS3Object processor that downloads the files and a custom
> processor that parses the data. Because of the size of the files the fetch
> and parse processors take minutes to run.
>
>
>
> My goal is to get the nodes in the cluster to each process one file at the
> time. However, when I set the file threshold of the two connections to 1 it
> can happen that when two files become available and one node is currently
> parsing the first file, that it also picks up the second file on that node
> because the first connection is empty. However, in this case I want another
> node to pick up the file as they have more resources available. This problem
> becomes even bigger when other short running processors are added to the
> flow (for instance UpdateAttributes processors) as each of the connections
> required to fit these connections in the flow can then be filled by a
> flowfile, even though other nodes are idle.
>
> I tried setting the threshold of the connections to 0 but this does not seem
> to work as NiFi then seems to ignore this value (the processor before such a
> connection is not halted). Does anyone know a way to achieve this behaviour?
>
>
>
> --
> View this message in context: http://apache-nifi-users-list.2361937.n4.nabble.com/Problem-when-using-backpressure-to-distribute-load-over-nodes-in-a-cluster-tp863.html
> Sent from the Apache NiFi Users List mailing list archive at Nabble.com.