You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by "Kessler, Jon" <Ke...@varentech.com> on 2019/10/17 11:24:57 UTC

[Discuss] Data prioritization - A proposed solution

I want to start a discussion about a new prioritization mechanism that addresses some of the issues that I believe exist in the current solution. These issues are:

 - Scheduling: No consideration is given to data priority when determining which component is given the next available thread with which to work
 - Constant sorting: Because all flowfiles in a given connection share the same PriorityQueue they must be sorted every time they move. While this sort is efficient it can add up as queues grow deep.
 - Administration: There is a costly human element to managing the value used as a priority ranking as priorities change. You must also ensure every connection in the appropriate flow has the proper prioritizer assigned to it to make use of the property.

We have developed a prototype of a new FlowFileQueue implementation that addresses these issues. Use of this implementation is controlled via nifi.properties so you can opt-in or out system-wide without doing a lot of configuration. Its design goals are:

  - Instead of using the value of a FlowFile attribute as a ranking, maintain a set of expression language rules to define your priorities. The highest ranked rule that a given FlowFile satisfies will be that FlowFile's priority
  - Because we have a finite set of priority rules we can utilize a bucket sort in our connections. One bucket per priority rule. The bucket/rule with which a FlowFile is associated with will be maintained so that as it moves through the system we do not have to re-evaluate that Flowfile against our ruleset unless we have reason to do so.
  - Control where in your flow FlowFiles are evaluated against the ruleset with a new Prioritizer implementation: BucketPrioritizer.
  - When this queue implementation is polled it will be able to check state to see if any data of a higher priority than what it currently contains recently (within 5s) moved elsewhere in the system. If higher priority data has recently moved elsewhere, the connection will only provide a FlowFile X% of the time where X is defined along with the rule. This allows higher priority data to have more frequent access to threads without thread-starving lower priority data.
  - Rules will be managed via a menu option for the flow and changes to them take effect instantly. This allows you to change your priorities without stopping/editing/restarting various components on the graph.

I intend to contribute this solution but first want to solicit input and opinions.

  - Jon Kessler


Re: EXT: Re: Re: [Discuss] Data prioritization - A proposed solution

Posted by Bryan Bende <bb...@gmail.com>.
Jon,

I've added you to contributors role in JIRA.

Thanks,

Bryan

On Fri, Nov 1, 2019 at 11:40 AM Kessler, Jon <Ke...@varentech.com> wrote:
>
> I finally got around to creating this ticket: https://issues.apache.org/jira/browse/NIFI-6831. May I please have contributor status so that I can assign it to myself and submit a PR?
> ________________________________
> From: Kessler, Jon <Ke...@varentech.com>
> Sent: Thursday, October 17, 2019 11:54 AM
> To: dev@nifi.apache.org <de...@nifi.apache.org>
> Subject: EXT: Re: Re: [Discuss] Data prioritization - A proposed solution
>
> Joe, hopefully I addressed all of your questions:
>
> Your interpretation of the scheduling aspect is correct. These queues will pretend to be empty a certain % of the time if higher priority data recently moved elsewhere. That % is configurable on a per rule basis which allows the operator to determine how much to stagger the data associated with each rule. That % is also how the rules are ranked in terms of order of priority. The higher the %, the more often a rule will make use of its threads so the higher its priority is considered to be.
>
> Administration: You are correct that the ruleset is provided at the flow controller level but will be leveraged by all connections regardless of whether or not they use the BucketPrioritizer (more details on this below). This overall solution only works if all FlowFileQueues are of this new implementation which is why I tied it to https://linkprotect.cudasvc.com/url?a=https%3a%2f%2fnifi.properties&c=E,1,khT7KpnXsP8UViPixkk5vq9293DkX9lU09S2GeUBaKYVdDr1TnVONYMcykhfPmHfDp0J-xlVYakccQsMx1MQIf0Cut40R_i_TAz1cL6N5nvanw,,&typo=1 settings.
>
> The sorting function here takes place on insertion into any connection on which a BucketPrioritizer is set. Once a FlowFile has been sorted into a bucket we maintain that state so that each time it moves into a new connection we already know in which bucket it should be placed without needing to have a BucketPrioritizer set on that connection. Each bucket in a connection is just a FIFO queue so no additional sorting is done. You should only have to configure connections to use the BucketPrioritizer at points in your flow where you believe you'll have enough information to accurately determine priority but not beyond that point unless you want to re-evaluate downstream for some reason. There is administration involved in setting these BucketPrioritizers on some connections but it should be minimal per flow (sometimes as few as one).
>
> Some additional information: When you delete a rule the next time each FlowFile moves that was already associated with that rule it will be re-evaluated against the ruleset when it enters the next connection regardless of whether or not a BucketPrioritizer was set on that connection. Also FlowFiles that have yet to be evaluated (have yet to encounter a BucketPrioritizer) will not be staggered. This was a design decision that if we don't know what a priority is for a given FlowFile we should get it to that point in the flow as soon as possible. This decision was a result of emperical evidence that when we did stagger unevaluated data an incoming flow of high priority data slowed its own upstream processing down once it was identified and processed as high priority.
>
> Multi-tenancy: Agreed that a global priority list could be too restrictive for multi-tenancy and should be addressed.
>
> Per swapping, this is an area where I admittedly need to put more thought into my implementation because there is plenty of room for improvement. Right now I'm just swapping files to disk in order of least to greatest priority but they are all stored together. Therefore they're read back into memory in order of least to greatest priority. More work should be done here.
>
>   - Jon
> ________________________________
> From: Joe Witt <jo...@gmail.com>
> Sent: Thursday, October 17, 2019 8:12:52 AM
> To: dev@nifi.apache.org
> Subject: EXT: Re: [Discuss] Data prioritization - A proposed solution
>
> Jon
>
> Probably some details I don't quite understand yet so responses here are to
> get there...
>
> The concept for scheduling is interesting.  Does this basically work around
> the fact that we have an unfair scheduler so this has queue implementations
> which pretend data is not available when it knows that there is higher
> priority data available elsewhere thus returning more threads to the pool
> faster to increase the likelihood that queues with higher priority data
> will get served more often?
>
> The notion of prioritization implies there is a sorting function happening
> somewhere.  NiFi now does sorting on insertion to every queue.  At what
> points are you suggesting sorting can be done/reduced to?
>
> Administration: The existing model does require each prioritizer to be set
> for each queue.  Yours does as well - to opt into this you'd have to select
> the BucketPrioritizer right?  It seems like you're saying the priority
> ruleset would be provided at the flow controller level and be enforced by
> all connections which leverage this prioritizer.  For large multi-tenant
> nifi flows having a global ruleset might be too limiting but maybe we just
> dont worry about that yet.
>
> How does this idea work with the fact that queues as the reach a given
> threshold have their data swapped out to disk and as data gets worked off
> the flowfiles get swapped back into memory?
>
> Thanks
> Joe
>
> On Thu, Oct 17, 2019 at 7:28 AM Kessler, Jon <Ke...@varentech.com> wrote:
>
> > I want to start a discussion about a new prioritization mechanism that
> > addresses some of the issues that I believe exist in the current solution.
> > These issues are:
> >
> >  - Scheduling: No consideration is given to data priority when determining
> > which component is given the next available thread with which to work
> >  - Constant sorting: Because all flowfiles in a given connection share the
> > same PriorityQueue they must be sorted every time they move. While this
> > sort is efficient it can add up as queues grow deep.
> >  - Administration: There is a costly human element to managing the value
> > used as a priority ranking as priorities change. You must also ensure every
> > connection in the appropriate flow has the proper prioritizer assigned to
> > it to make use of the property.
> >
> > We have developed a prototype of a new FlowFileQueue implementation that
> > addresses these issues. Use of this implementation is controlled via
> > https://linkprotect.cudasvc.com/url?a=https%3a%2f%2fnifi.properties&c=E,1,D-6vNtOC1armix171OZD90B4R0R1UHhG0ngPYawBEOljpSN5kOIfQ8lO9bTcExg_yuJivgzihwwJb6bhpftjXpqwjEl6UEcrh5miqpS_EFIvMdizFFNG&typo=1 so you can opt-in or out system-wide without doing a lot of
> > configuration. Its design goals are:
> >
> >   - Instead of using the value of a FlowFile attribute as a ranking,
> > maintain a set of expression language rules to define your priorities. The
> > highest ranked rule that a given FlowFile satisfies will be that FlowFile's
> > priority
> >   - Because we have a finite set of priority rules we can utilize a bucket
> > sort in our connections. One bucket per priority rule. The bucket/rule with
> > which a FlowFile is associated with will be maintained so that as it moves
> > through the system we do not have to re-evaluate that Flowfile against our
> > ruleset unless we have reason to do so.
> >   - Control where in your flow FlowFiles are evaluated against the ruleset
> > with a new Prioritizer implementation: BucketPrioritizer.
> >   - When this queue implementation is polled it will be able to check
> > state to see if any data of a higher priority than what it currently
> > contains recently (within 5s) moved elsewhere in the system. If higher
> > priority data has recently moved elsewhere, the connection will only
> > provide a FlowFile X% of the time where X is defined along with the rule.
> > This allows higher priority data to have more frequent access to threads
> > without thread-starving lower priority data.
> >   - Rules will be managed via a menu option for the flow and changes to
> > them take effect instantly. This allows you to change your priorities
> > without stopping/editing/restarting various components on the graph.
> >
> > I intend to contribute this solution but first want to solicit input and
> > opinions.
> >
> >   - Jon Kessler
> >
> >

Re: EXT: Re: Re: [Discuss] Data prioritization - A proposed solution

Posted by "Kessler, Jon" <Ke...@varentech.com>.
I finally got around to creating this ticket: https://issues.apache.org/jira/browse/NIFI-6831. May I please have contributor status so that I can assign it to myself and submit a PR?
________________________________
From: Kessler, Jon <Ke...@varentech.com>
Sent: Thursday, October 17, 2019 11:54 AM
To: dev@nifi.apache.org <de...@nifi.apache.org>
Subject: EXT: Re: Re: [Discuss] Data prioritization - A proposed solution

Joe, hopefully I addressed all of your questions:

Your interpretation of the scheduling aspect is correct. These queues will pretend to be empty a certain % of the time if higher priority data recently moved elsewhere. That % is configurable on a per rule basis which allows the operator to determine how much to stagger the data associated with each rule. That % is also how the rules are ranked in terms of order of priority. The higher the %, the more often a rule will make use of its threads so the higher its priority is considered to be.

Administration: You are correct that the ruleset is provided at the flow controller level but will be leveraged by all connections regardless of whether or not they use the BucketPrioritizer (more details on this below). This overall solution only works if all FlowFileQueues are of this new implementation which is why I tied it to https://linkprotect.cudasvc.com/url?a=https%3a%2f%2fnifi.properties&c=E,1,khT7KpnXsP8UViPixkk5vq9293DkX9lU09S2GeUBaKYVdDr1TnVONYMcykhfPmHfDp0J-xlVYakccQsMx1MQIf0Cut40R_i_TAz1cL6N5nvanw,,&typo=1 settings.

The sorting function here takes place on insertion into any connection on which a BucketPrioritizer is set. Once a FlowFile has been sorted into a bucket we maintain that state so that each time it moves into a new connection we already know in which bucket it should be placed without needing to have a BucketPrioritizer set on that connection. Each bucket in a connection is just a FIFO queue so no additional sorting is done. You should only have to configure connections to use the BucketPrioritizer at points in your flow where you believe you'll have enough information to accurately determine priority but not beyond that point unless you want to re-evaluate downstream for some reason. There is administration involved in setting these BucketPrioritizers on some connections but it should be minimal per flow (sometimes as few as one).

Some additional information: When you delete a rule the next time each FlowFile moves that was already associated with that rule it will be re-evaluated against the ruleset when it enters the next connection regardless of whether or not a BucketPrioritizer was set on that connection. Also FlowFiles that have yet to be evaluated (have yet to encounter a BucketPrioritizer) will not be staggered. This was a design decision that if we don't know what a priority is for a given FlowFile we should get it to that point in the flow as soon as possible. This decision was a result of emperical evidence that when we did stagger unevaluated data an incoming flow of high priority data slowed its own upstream processing down once it was identified and processed as high priority.

Multi-tenancy: Agreed that a global priority list could be too restrictive for multi-tenancy and should be addressed.

Per swapping, this is an area where I admittedly need to put more thought into my implementation because there is plenty of room for improvement. Right now I'm just swapping files to disk in order of least to greatest priority but they are all stored together. Therefore they're read back into memory in order of least to greatest priority. More work should be done here.

  - Jon
________________________________
From: Joe Witt <jo...@gmail.com>
Sent: Thursday, October 17, 2019 8:12:52 AM
To: dev@nifi.apache.org
Subject: EXT: Re: [Discuss] Data prioritization - A proposed solution

Jon

Probably some details I don't quite understand yet so responses here are to
get there...

The concept for scheduling is interesting.  Does this basically work around
the fact that we have an unfair scheduler so this has queue implementations
which pretend data is not available when it knows that there is higher
priority data available elsewhere thus returning more threads to the pool
faster to increase the likelihood that queues with higher priority data
will get served more often?

The notion of prioritization implies there is a sorting function happening
somewhere.  NiFi now does sorting on insertion to every queue.  At what
points are you suggesting sorting can be done/reduced to?

Administration: The existing model does require each prioritizer to be set
for each queue.  Yours does as well - to opt into this you'd have to select
the BucketPrioritizer right?  It seems like you're saying the priority
ruleset would be provided at the flow controller level and be enforced by
all connections which leverage this prioritizer.  For large multi-tenant
nifi flows having a global ruleset might be too limiting but maybe we just
dont worry about that yet.

How does this idea work with the fact that queues as the reach a given
threshold have their data swapped out to disk and as data gets worked off
the flowfiles get swapped back into memory?

Thanks
Joe

On Thu, Oct 17, 2019 at 7:28 AM Kessler, Jon <Ke...@varentech.com> wrote:

> I want to start a discussion about a new prioritization mechanism that
> addresses some of the issues that I believe exist in the current solution.
> These issues are:
>
>  - Scheduling: No consideration is given to data priority when determining
> which component is given the next available thread with which to work
>  - Constant sorting: Because all flowfiles in a given connection share the
> same PriorityQueue they must be sorted every time they move. While this
> sort is efficient it can add up as queues grow deep.
>  - Administration: There is a costly human element to managing the value
> used as a priority ranking as priorities change. You must also ensure every
> connection in the appropriate flow has the proper prioritizer assigned to
> it to make use of the property.
>
> We have developed a prototype of a new FlowFileQueue implementation that
> addresses these issues. Use of this implementation is controlled via
> https://linkprotect.cudasvc.com/url?a=https%3a%2f%2fnifi.properties&c=E,1,D-6vNtOC1armix171OZD90B4R0R1UHhG0ngPYawBEOljpSN5kOIfQ8lO9bTcExg_yuJivgzihwwJb6bhpftjXpqwjEl6UEcrh5miqpS_EFIvMdizFFNG&typo=1 so you can opt-in or out system-wide without doing a lot of
> configuration. Its design goals are:
>
>   - Instead of using the value of a FlowFile attribute as a ranking,
> maintain a set of expression language rules to define your priorities. The
> highest ranked rule that a given FlowFile satisfies will be that FlowFile's
> priority
>   - Because we have a finite set of priority rules we can utilize a bucket
> sort in our connections. One bucket per priority rule. The bucket/rule with
> which a FlowFile is associated with will be maintained so that as it moves
> through the system we do not have to re-evaluate that Flowfile against our
> ruleset unless we have reason to do so.
>   - Control where in your flow FlowFiles are evaluated against the ruleset
> with a new Prioritizer implementation: BucketPrioritizer.
>   - When this queue implementation is polled it will be able to check
> state to see if any data of a higher priority than what it currently
> contains recently (within 5s) moved elsewhere in the system. If higher
> priority data has recently moved elsewhere, the connection will only
> provide a FlowFile X% of the time where X is defined along with the rule.
> This allows higher priority data to have more frequent access to threads
> without thread-starving lower priority data.
>   - Rules will be managed via a menu option for the flow and changes to
> them take effect instantly. This allows you to change your priorities
> without stopping/editing/restarting various components on the graph.
>
> I intend to contribute this solution but first want to solicit input and
> opinions.
>
>   - Jon Kessler
>
>

Re: Re: [Discuss] Data prioritization - A proposed solution

Posted by Tony Kurc <tk...@apache.org>.
On FBP purity - even absent this implementation, there are "global"
resources being contended over (disk, network, threads). Absent infinite
capacity in all those things, I don't think a pure implementation is
possible. In my opinion, what is being discussed is the best way of coping
with one aspect of resource contention while compromising as little as
possible.

Tony

On Fri, Oct 18, 2019 at 3:29 PM Joe Witt <jo...@gmail.com> wrote:

> I should clarify a bit....It is a perfectly fine thing to have an
> alternative queue implementation which will have awareness that other high
> priority data exists elsewhere and thus will intentionally respond as if
> data is not available on a given queue so that other queues with higher
> priority data will be serviced first.  This will be helpful for ensuring
> highest priority data gets worked first.
>
> My concerns relating to FBP purity as Andy brought up are important
> considerations but they would not necessarily block the inclusion of such
> an implementation.  We need to see it and talk through it for its own sake
> - not FBPs.
>
> At this stage I think you see there are plenty of folks interested in what
> this could mean.  It is clearly touching on something that could benefit
> from new ideas/improvement.  Hopefully you choose to make a PR/JIRA for it
> as that is probably the next best step.
>
>
>
> On Fri, Oct 18, 2019 at 1:59 PM Joe Witt <jo...@gmail.com> wrote:
>
> > ...I was of course thinking the same thing about the fact this really
> does
> > violate the point of the flow based programming construct.  But I only
> > think it is the implementation that violates this.  Not the idea.  The
> idea
> > is that some portions of the flow should have higher priority than others
> > for execution/queuing.  It is the implementation which does this by
> > coupling logic of how a queue behaves with the fact that higher priority
> > data lives elsewhere and it then artificially says 'no work right now go
> do
> > something more important'.  If we stay focused on the spirit of the idea
> I
> > think there is something really interesting here that is
> > additive/beneficial to the FBP constructs.
> >
> > I am slightly emboldened here in that having spoken with J Paul Morrison
> > about NiFi and FBP he felt like we had done some things which were
> > additive/beneficial to FBP and felt we were more like Classical FBP than
> > many other things even though we've not implemented named ports for
> > components (yet).
> >
> > That said, I do also feel like the real core need Jon's idea addresses is
> > possibly better served by it being easier to deploy resource constrained
> > NiFi Clusters or having things like stateless-nifi as an executor.
> >
> > In any event - this is a good discussion area and I'm looking forward to
> > seeing how this evolves.
> >
> >
> >
> > On Fri, Oct 18, 2019 at 1:50 PM Andy LoPresto <al...@apache.org>
> > wrote:
> >
> >> This is an interesting idea to solve an admitted problem, but I wonder
> >> how it comports with the core tenets of Flow Based Programming on which
> >> NiFi is modeled. This seems to introduce globally-coupled dependencies
> >> between all queues in a flow, where another solution (flow segment-based
> >> resource allocation) might solve this problem without requiring
> per-queue
> >> contention on every cycle. I think with the stateless NiFi work there
> has
> >> been some discussion around being able to control the resource
> allocation
> >> for a flow segment. Sam Hjemfelt, any thoughts here?
> >>
> >>
> >> Andy LoPresto
> >> alopresto@apache.org
> >> alopresto.apache@gmail.com
> >> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69
> >>
> >> > On Oct 17, 2019, at 8:54 AM, Kessler, Jon <Ke...@varentech.com>
> >> wrote:
> >> >
> >> > Joe, hopefully I addressed all of your questions:
> >> >
> >> > Your interpretation of the scheduling aspect is correct. These queues
> >> will pretend to be empty a certain % of the time if higher priority data
> >> recently moved elsewhere. That % is configurable on a per rule basis
> which
> >> allows the operator to determine how much to stagger the data associated
> >> with each rule. That % is also how the rules are ranked in terms of
> order
> >> of priority. The higher the %, the more often a rule will make use of
> its
> >> threads so the higher its priority is considered to be.
> >> >
> >> > Administration: You are correct that the ruleset is provided at the
> >> flow controller level but will be leveraged by all connections
> regardless
> >> of whether or not they use the BucketPrioritizer (more details on this
> >> below). This overall solution only works if all FlowFileQueues are of
> this
> >> new implementation which is why I tied it to nifi.properties settings.
> >> >
> >> > The sorting function here takes place on insertion into any connection
> >> on which a BucketPrioritizer is set. Once a FlowFile has been sorted
> into a
> >> bucket we maintain that state so that each time it moves into a new
> >> connection we already know in which bucket it should be placed without
> >> needing to have a BucketPrioritizer set on that connection. Each bucket
> in
> >> a connection is just a FIFO queue so no additional sorting is done. You
> >> should only have to configure connections to use the BucketPrioritizer
> at
> >> points in your flow where you believe you'll have enough information to
> >> accurately determine priority but not beyond that point unless you want
> to
> >> re-evaluate downstream for some reason. There is administration
> involved in
> >> setting these BucketPrioritizers on some connections but it should be
> >> minimal per flow (sometimes as few as one).
> >> >
> >> > Some additional information: When you delete a rule the next time each
> >> FlowFile moves that was already associated with that rule it will be
> >> re-evaluated against the ruleset when it enters the next connection
> >> regardless of whether or not a BucketPrioritizer was set on that
> >> connection. Also FlowFiles that have yet to be evaluated (have yet to
> >> encounter a BucketPrioritizer) will not be staggered. This was a design
> >> decision that if we don't know what a priority is for a given FlowFile
> we
> >> should get it to that point in the flow as soon as possible. This
> decision
> >> was a result of emperical evidence that when we did stagger unevaluated
> >> data an incoming flow of high priority data slowed its own upstream
> >> processing down once it was identified and processed as high priority.
> >> >
> >> > Multi-tenancy: Agreed that a global priority list could be too
> >> restrictive for multi-tenancy and should be addressed.
> >> >
> >> > Per swapping, this is an area where I admittedly need to put more
> >> thought into my implementation because there is plenty of room for
> >> improvement. Right now I'm just swapping files to disk in order of
> least to
> >> greatest priority but they are all stored together. Therefore they're
> read
> >> back into memory in order of least to greatest priority. More work
> should
> >> be done here.
> >> >
> >> >  - Jon
> >> > ________________________________
> >> > From: Joe Witt <joe.witt@gmail.com <ma...@gmail.com>>
> >> > Sent: Thursday, October 17, 2019 8:12:52 AM
> >> > To: dev@nifi.apache.org <ma...@nifi.apache.org>
> >> > Subject: EXT: Re: [Discuss] Data prioritization - A proposed solution
> >> >
> >> > Jon
> >> >
> >> > Probably some details I don't quite understand yet so responses here
> >> are to
> >> > get there...
> >> >
> >> > The concept for scheduling is interesting.  Does this basically work
> >> around
> >> > the fact that we have an unfair scheduler so this has queue
> >> implementations
> >> > which pretend data is not available when it knows that there is higher
> >> > priority data available elsewhere thus returning more threads to the
> >> pool
> >> > faster to increase the likelihood that queues with higher priority
> data
> >> > will get served more often?
> >> >
> >> > The notion of prioritization implies there is a sorting function
> >> happening
> >> > somewhere.  NiFi now does sorting on insertion to every queue.  At
> what
> >> > points are you suggesting sorting can be done/reduced to?
> >> >
> >> > Administration: The existing model does require each prioritizer to be
> >> set
> >> > for each queue.  Yours does as well - to opt into this you'd have to
> >> select
> >> > the BucketPrioritizer right?  It seems like you're saying the priority
> >> > ruleset would be provided at the flow controller level and be enforced
> >> by
> >> > all connections which leverage this prioritizer.  For large
> multi-tenant
> >> > nifi flows having a global ruleset might be too limiting but maybe we
> >> just
> >> > dont worry about that yet.
> >> >
> >> > How does this idea work with the fact that queues as the reach a given
> >> > threshold have their data swapped out to disk and as data gets worked
> >> off
> >> > the flowfiles get swapped back into memory?
> >> >
> >> > Thanks
> >> > Joe
> >> >
> >> > On Thu, Oct 17, 2019 at 7:28 AM Kessler, Jon <Ke...@varentech.com>
> >> wrote:
> >> >
> >> >> I want to start a discussion about a new prioritization mechanism
> that
> >> >> addresses some of the issues that I believe exist in the current
> >> solution.
> >> >> These issues are:
> >> >>
> >> >> - Scheduling: No consideration is given to data priority when
> >> determining
> >> >> which component is given the next available thread with which to work
> >> >> - Constant sorting: Because all flowfiles in a given connection share
> >> the
> >> >> same PriorityQueue they must be sorted every time they move. While
> this
> >> >> sort is efficient it can add up as queues grow deep.
> >> >> - Administration: There is a costly human element to managing the
> value
> >> >> used as a priority ranking as priorities change. You must also ensure
> >> every
> >> >> connection in the appropriate flow has the proper prioritizer
> assigned
> >> to
> >> >> it to make use of the property.
> >> >>
> >> >> We have developed a prototype of a new FlowFileQueue implementation
> >> that
> >> >> addresses these issues. Use of this implementation is controlled via
> >> >>
> >>
> https://linkprotect.cudasvc.com/url?a=https%3a%2f%2fnifi.properties&c=E,1,D-6vNtOC1armix171OZD90B4R0R1UHhG0ngPYawBEOljpSN5kOIfQ8lO9bTcExg_yuJivgzihwwJb6bhpftjXpqwjEl6UEcrh5miqpS_EFIvMdizFFNG&typo=1
> >> <
> >>
> https://linkprotect.cudasvc.com/url?a=https%3a%2f%2fnifi.properties&c=E,1,D-6vNtOC1armix171OZD90B4R0R1UHhG0ngPYawBEOljpSN5kOIfQ8lO9bTcExg_yuJivgzihwwJb6bhpftjXpqwjEl6UEcrh5miqpS_EFIvMdizFFNG&typo=1
> >
> >> so you can opt-in or out system-wide without doing a lot of
> >> >> configuration. Its design goals are:
> >> >>
> >> >>  - Instead of using the value of a FlowFile attribute as a ranking,
> >> >> maintain a set of expression language rules to define your
> priorities.
> >> The
> >> >> highest ranked rule that a given FlowFile satisfies will be that
> >> FlowFile's
> >> >> priority
> >> >>  - Because we have a finite set of priority rules we can utilize a
> >> bucket
> >> >> sort in our connections. One bucket per priority rule. The
> bucket/rule
> >> with
> >> >> which a FlowFile is associated with will be maintained so that as it
> >> moves
> >> >> through the system we do not have to re-evaluate that Flowfile
> against
> >> our
> >> >> ruleset unless we have reason to do so.
> >> >>  - Control where in your flow FlowFiles are evaluated against the
> >> ruleset
> >> >> with a new Prioritizer implementation: BucketPrioritizer.
> >> >>  - When this queue implementation is polled it will be able to check
> >> >> state to see if any data of a higher priority than what it currently
> >> >> contains recently (within 5s) moved elsewhere in the system. If
> higher
> >> >> priority data has recently moved elsewhere, the connection will only
> >> >> provide a FlowFile X% of the time where X is defined along with the
> >> rule.
> >> >> This allows higher priority data to have more frequent access to
> >> threads
> >> >> without thread-starving lower priority data.
> >> >>  - Rules will be managed via a menu option for the flow and changes
> to
> >> >> them take effect instantly. This allows you to change your priorities
> >> >> without stopping/editing/restarting various components on the graph.
> >> >>
> >> >> I intend to contribute this solution but first want to solicit input
> >> and
> >> >> opinions.
> >> >>
> >> >>  - Jon Kessler
> >>
> >>
>

Re: Re: Re: [Discuss] Data prioritization - A proposed solution

Posted by Mark Bean <ma...@gmail.com>.
First, I think this idea is fantastic. The ability to move certain
high-priority data through the flow faster than other data is a crucial
feature which is currently missing - at least in an efficient
implementation.

I had one thought on getting away from the globally-coupled queue problem.
Perhaps each process group can have a property to configure its own queue
implementation. Obviously, the implementation would still be shared across
multiple components, but would minimize the scope. Also, this allows
multi-tenant authorization for configuring the queue implementation.

There are additional details that need to be worked out such as logistics
as well as efficiencies of migrating a flowfile from one queue to another
should it cross a process group boundary that changes the queue
implementation. However, this addresses at least some of the concerns of
FBP.

Jon, can you comment whether you see this as possible? Does any existence
of a non-priority-based queue shatter the ability for the controller to
effectively allocate threads in a manner that raises priority? Would a
prioritized queue within a single process group still behave as you intend
(within its own scope) even if not quite as efficient as if it were
coordinated across the entire flow?

-Mark

On Fri, Oct 18, 2019 at 11:37 PM Kessler, Jon <Ke...@varentech.com>
wrote:

> Understood, I will have a jira ticket/pr in early next week and we can go
> from there.
>
> - Jon
> ________________________________
> From: Joe Witt <jo...@gmail.com>
> Sent: Friday, October 18, 2019 3:29:01 PM
> To: dev@nifi.apache.org
> Subject: EXT: Re: Re: [Discuss] Data prioritization - A proposed solution
>
> I should clarify a bit....It is a perfectly fine thing to have an
> alternative queue implementation which will have awareness that other high
> priority data exists elsewhere and thus will intentionally respond as if
> data is not available on a given queue so that other queues with higher
> priority data will be serviced first.  This will be helpful for ensuring
> highest priority data gets worked first.
>
> My concerns relating to FBP purity as Andy brought up are important
> considerations but they would not necessarily block the inclusion of such
> an implementation.  We need to see it and talk through it for its own sake
> - not FBPs.
>
> At this stage I think you see there are plenty of folks interested in what
> this could mean.  It is clearly touching on something that could benefit
> from new ideas/improvement.  Hopefully you choose to make a PR/JIRA for it
> as that is probably the next best step.
>
>
>
> On Fri, Oct 18, 2019 at 1:59 PM Joe Witt <jo...@gmail.com> wrote:
>
> > ...I was of course thinking the same thing about the fact this really
> does
> > violate the point of the flow based programming construct.  But I only
> > think it is the implementation that violates this.  Not the idea.  The
> idea
> > is that some portions of the flow should have higher priority than others
> > for execution/queuing.  It is the implementation which does this by
> > coupling logic of how a queue behaves with the fact that higher priority
> > data lives elsewhere and it then artificially says 'no work right now go
> do
> > something more important'.  If we stay focused on the spirit of the idea
> I
> > think there is something really interesting here that is
> > additive/beneficial to the FBP constructs.
> >
> > I am slightly emboldened here in that having spoken with J Paul Morrison
> > about NiFi and FBP he felt like we had done some things which were
> > additive/beneficial to FBP and felt we were more like Classical FBP than
> > many other things even though we've not implemented named ports for
> > components (yet).
> >
> > That said, I do also feel like the real core need Jon's idea addresses is
> > possibly better served by it being easier to deploy resource constrained
> > NiFi Clusters or having things like stateless-nifi as an executor.
> >
> > In any event - this is a good discussion area and I'm looking forward to
> > seeing how this evolves.
> >
> >
> >
> > On Fri, Oct 18, 2019 at 1:50 PM Andy LoPresto <al...@apache.org>
> > wrote:
> >
> >> This is an interesting idea to solve an admitted problem, but I wonder
> >> how it comports with the core tenets of Flow Based Programming on which
> >> NiFi is modeled. This seems to introduce globally-coupled dependencies
> >> between all queues in a flow, where another solution (flow segment-based
> >> resource allocation) might solve this problem without requiring
> per-queue
> >> contention on every cycle. I think with the stateless NiFi work there
> has
> >> been some discussion around being able to control the resource
> allocation
> >> for a flow segment. Sam Hjemfelt, any thoughts here?
> >>
> >>
> >> Andy LoPresto
> >> alopresto@apache.org
> >> alopresto.apache@gmail.com
> >> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69
> >>
> >> > On Oct 17, 2019, at 8:54 AM, Kessler, Jon <Ke...@varentech.com>
> >> wrote:
> >> >
> >> > Joe, hopefully I addressed all of your questions:
> >> >
> >> > Your interpretation of the scheduling aspect is correct. These queues
> >> will pretend to be empty a certain % of the time if higher priority data
> >> recently moved elsewhere. That % is configurable on a per rule basis
> which
> >> allows the operator to determine how much to stagger the data associated
> >> with each rule. That % is also how the rules are ranked in terms of
> order
> >> of priority. The higher the %, the more often a rule will make use of
> its
> >> threads so the higher its priority is considered to be.
> >> >
> >> > Administration: You are correct that the ruleset is provided at the
> >> flow controller level but will be leveraged by all connections
> regardless
> >> of whether or not they use the BucketPrioritizer (more details on this
> >> below). This overall solution only works if all FlowFileQueues are of
> this
> >> new implementation which is why I tied it to nifi.properties settings.
> >> >
> >> > The sorting function here takes place on insertion into any connection
> >> on which a BucketPrioritizer is set. Once a FlowFile has been sorted
> into a
> >> bucket we maintain that state so that each time it moves into a new
> >> connection we already know in which bucket it should be placed without
> >> needing to have a BucketPrioritizer set on that connection. Each bucket
> in
> >> a connection is just a FIFO queue so no additional sorting is done. You
> >> should only have to configure connections to use the BucketPrioritizer
> at
> >> points in your flow where you believe you'll have enough information to
> >> accurately determine priority but not beyond that point unless you want
> to
> >> re-evaluate downstream for some reason. There is administration
> involved in
> >> setting these BucketPrioritizers on some connections but it should be
> >> minimal per flow (sometimes as few as one).
> >> >
> >> > Some additional information: When you delete a rule the next time each
> >> FlowFile moves that was already associated with that rule it will be
> >> re-evaluated against the ruleset when it enters the next connection
> >> regardless of whether or not a BucketPrioritizer was set on that
> >> connection. Also FlowFiles that have yet to be evaluated (have yet to
> >> encounter a BucketPrioritizer) will not be staggered. This was a design
> >> decision that if we don't know what a priority is for a given FlowFile
> we
> >> should get it to that point in the flow as soon as possible. This
> decision
> >> was a result of emperical evidence that when we did stagger unevaluated
> >> data an incoming flow of high priority data slowed its own upstream
> >> processing down once it was identified and processed as high priority.
> >> >
> >> > Multi-tenancy: Agreed that a global priority list could be too
> >> restrictive for multi-tenancy and should be addressed.
> >> >
> >> > Per swapping, this is an area where I admittedly need to put more
> >> thought into my implementation because there is plenty of room for
> >> improvement. Right now I'm just swapping files to disk in order of
> least to
> >> greatest priority but they are all stored together. Therefore they're
> read
> >> back into memory in order of least to greatest priority. More work
> should
> >> be done here.
> >> >
> >> >  - Jon
> >> > ________________________________
> >> > From: Joe Witt <joe.witt@gmail.com <ma...@gmail.com>>
> >> > Sent: Thursday, October 17, 2019 8:12:52 AM
> >> > To: dev@nifi.apache.org <ma...@nifi.apache.org>
> >> > Subject: EXT: Re: [Discuss] Data prioritization - A proposed solution
> >> >
> >> > Jon
> >> >
> >> > Probably some details I don't quite understand yet so responses here
> >> are to
> >> > get there...
> >> >
> >> > The concept for scheduling is interesting.  Does this basically work
> >> around
> >> > the fact that we have an unfair scheduler so this has queue
> >> implementations
> >> > which pretend data is not available when it knows that there is higher
> >> > priority data available elsewhere thus returning more threads to the
> >> pool
> >> > faster to increase the likelihood that queues with higher priority
> data
> >> > will get served more often?
> >> >
> >> > The notion of prioritization implies there is a sorting function
> >> happening
> >> > somewhere.  NiFi now does sorting on insertion to every queue.  At
> what
> >> > points are you suggesting sorting can be done/reduced to?
> >> >
> >> > Administration: The existing model does require each prioritizer to be
> >> set
> >> > for each queue.  Yours does as well - to opt into this you'd have to
> >> select
> >> > the BucketPrioritizer right?  It seems like you're saying the priority
> >> > ruleset would be provided at the flow controller level and be enforced
> >> by
> >> > all connections which leverage this prioritizer.  For large
> multi-tenant
> >> > nifi flows having a global ruleset might be too limiting but maybe we
> >> just
> >> > dont worry about that yet.
> >> >
> >> > How does this idea work with the fact that queues as the reach a given
> >> > threshold have their data swapped out to disk and as data gets worked
> >> off
> >> > the flowfiles get swapped back into memory?
> >> >
> >> > Thanks
> >> > Joe
> >> >
> >> > On Thu, Oct 17, 2019 at 7:28 AM Kessler, Jon <Ke...@varentech.com>
> >> wrote:
> >> >
> >> >> I want to start a discussion about a new prioritization mechanism
> that
> >> >> addresses some of the issues that I believe exist in the current
> >> solution.
> >> >> These issues are:
> >> >>
> >> >> - Scheduling: No consideration is given to data priority when
> >> determining
> >> >> which component is given the next available thread with which to work
> >> >> - Constant sorting: Because all flowfiles in a given connection share
> >> the
> >> >> same PriorityQueue they must be sorted every time they move. While
> this
> >> >> sort is efficient it can add up as queues grow deep.
> >> >> - Administration: There is a costly human element to managing the
> value
> >> >> used as a priority ranking as priorities change. You must also ensure
> >> every
> >> >> connection in the appropriate flow has the proper prioritizer
> assigned
> >> to
> >> >> it to make use of the property.
> >> >>
> >> >> We have developed a prototype of a new FlowFileQueue implementation
> >> that
> >> >> addresses these issues. Use of this implementation is controlled via
> >> >>
> >>
> https://linkprotect.cudasvc.com/url?a=https%3a%2f%2fnifi.properties&c=E,1,D-6vNtOC1armix171OZD90B4R0R1UHhG0ngPYawBEOljpSN5kOIfQ8lO9bTcExg_yuJivgzihwwJb6bhpftjXpqwjEl6UEcrh5miqpS_EFIvMdizFFNG&typo=1
> >> <
> >>
> https://linkprotect.cudasvc.com/url?a=https%3a%2f%2fnifi.properties&c=E,1,D-6vNtOC1armix171OZD90B4R0R1UHhG0ngPYawBEOljpSN5kOIfQ8lO9bTcExg_yuJivgzihwwJb6bhpftjXpqwjEl6UEcrh5miqpS_EFIvMdizFFNG&typo=1
> >
> >> so you can opt-in or out system-wide without doing a lot of
> >> >> configuration. Its design goals are:
> >> >>
> >> >>  - Instead of using the value of a FlowFile attribute as a ranking,
> >> >> maintain a set of expression language rules to define your
> priorities.
> >> The
> >> >> highest ranked rule that a given FlowFile satisfies will be that
> >> FlowFile's
> >> >> priority
> >> >>  - Because we have a finite set of priority rules we can utilize a
> >> bucket
> >> >> sort in our connections. One bucket per priority rule. The
> bucket/rule
> >> with
> >> >> which a FlowFile is associated with will be maintained so that as it
> >> moves
> >> >> through the system we do not have to re-evaluate that Flowfile
> against
> >> our
> >> >> ruleset unless we have reason to do so.
> >> >>  - Control where in your flow FlowFiles are evaluated against the
> >> ruleset
> >> >> with a new Prioritizer implementation: BucketPrioritizer.
> >> >>  - When this queue implementation is polled it will be able to check
> >> >> state to see if any data of a higher priority than what it currently
> >> >> contains recently (within 5s) moved elsewhere in the system. If
> higher
> >> >> priority data has recently moved elsewhere, the connection will only
> >> >> provide a FlowFile X% of the time where X is defined along with the
> >> rule.
> >> >> This allows higher priority data to have more frequent access to
> >> threads
> >> >> without thread-starving lower priority data.
> >> >>  - Rules will be managed via a menu option for the flow and changes
> to
> >> >> them take effect instantly. This allows you to change your priorities
> >> >> without stopping/editing/restarting various components on the graph.
> >> >>
> >> >> I intend to contribute this solution but first want to solicit input
> >> and
> >> >> opinions.
> >> >>
> >> >>  - Jon Kessler
> >>
> >>
>

Re: Re: Re: [Discuss] Data prioritization - A proposed solution

Posted by "Kessler, Jon" <Ke...@varentech.com>.
Understood, I will have a jira ticket/pr in early next week and we can go from there.

- Jon
________________________________
From: Joe Witt <jo...@gmail.com>
Sent: Friday, October 18, 2019 3:29:01 PM
To: dev@nifi.apache.org
Subject: EXT: Re: Re: [Discuss] Data prioritization - A proposed solution

I should clarify a bit....It is a perfectly fine thing to have an
alternative queue implementation which will have awareness that other high
priority data exists elsewhere and thus will intentionally respond as if
data is not available on a given queue so that other queues with higher
priority data will be serviced first.  This will be helpful for ensuring
highest priority data gets worked first.

My concerns relating to FBP purity as Andy brought up are important
considerations but they would not necessarily block the inclusion of such
an implementation.  We need to see it and talk through it for its own sake
- not FBPs.

At this stage I think you see there are plenty of folks interested in what
this could mean.  It is clearly touching on something that could benefit
from new ideas/improvement.  Hopefully you choose to make a PR/JIRA for it
as that is probably the next best step.



On Fri, Oct 18, 2019 at 1:59 PM Joe Witt <jo...@gmail.com> wrote:

> ...I was of course thinking the same thing about the fact this really does
> violate the point of the flow based programming construct.  But I only
> think it is the implementation that violates this.  Not the idea.  The idea
> is that some portions of the flow should have higher priority than others
> for execution/queuing.  It is the implementation which does this by
> coupling logic of how a queue behaves with the fact that higher priority
> data lives elsewhere and it then artificially says 'no work right now go do
> something more important'.  If we stay focused on the spirit of the idea I
> think there is something really interesting here that is
> additive/beneficial to the FBP constructs.
>
> I am slightly emboldened here in that having spoken with J Paul Morrison
> about NiFi and FBP he felt like we had done some things which were
> additive/beneficial to FBP and felt we were more like Classical FBP than
> many other things even though we've not implemented named ports for
> components (yet).
>
> That said, I do also feel like the real core need Jon's idea addresses is
> possibly better served by it being easier to deploy resource constrained
> NiFi Clusters or having things like stateless-nifi as an executor.
>
> In any event - this is a good discussion area and I'm looking forward to
> seeing how this evolves.
>
>
>
> On Fri, Oct 18, 2019 at 1:50 PM Andy LoPresto <al...@apache.org>
> wrote:
>
>> This is an interesting idea to solve an admitted problem, but I wonder
>> how it comports with the core tenets of Flow Based Programming on which
>> NiFi is modeled. This seems to introduce globally-coupled dependencies
>> between all queues in a flow, where another solution (flow segment-based
>> resource allocation) might solve this problem without requiring per-queue
>> contention on every cycle. I think with the stateless NiFi work there has
>> been some discussion around being able to control the resource allocation
>> for a flow segment. Sam Hjemfelt, any thoughts here?
>>
>>
>> Andy LoPresto
>> alopresto@apache.org
>> alopresto.apache@gmail.com
>> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69
>>
>> > On Oct 17, 2019, at 8:54 AM, Kessler, Jon <Ke...@varentech.com>
>> wrote:
>> >
>> > Joe, hopefully I addressed all of your questions:
>> >
>> > Your interpretation of the scheduling aspect is correct. These queues
>> will pretend to be empty a certain % of the time if higher priority data
>> recently moved elsewhere. That % is configurable on a per rule basis which
>> allows the operator to determine how much to stagger the data associated
>> with each rule. That % is also how the rules are ranked in terms of order
>> of priority. The higher the %, the more often a rule will make use of its
>> threads so the higher its priority is considered to be.
>> >
>> > Administration: You are correct that the ruleset is provided at the
>> flow controller level but will be leveraged by all connections regardless
>> of whether or not they use the BucketPrioritizer (more details on this
>> below). This overall solution only works if all FlowFileQueues are of this
>> new implementation which is why I tied it to nifi.properties settings.
>> >
>> > The sorting function here takes place on insertion into any connection
>> on which a BucketPrioritizer is set. Once a FlowFile has been sorted into a
>> bucket we maintain that state so that each time it moves into a new
>> connection we already know in which bucket it should be placed without
>> needing to have a BucketPrioritizer set on that connection. Each bucket in
>> a connection is just a FIFO queue so no additional sorting is done. You
>> should only have to configure connections to use the BucketPrioritizer at
>> points in your flow where you believe you'll have enough information to
>> accurately determine priority but not beyond that point unless you want to
>> re-evaluate downstream for some reason. There is administration involved in
>> setting these BucketPrioritizers on some connections but it should be
>> minimal per flow (sometimes as few as one).
>> >
>> > Some additional information: When you delete a rule the next time each
>> FlowFile moves that was already associated with that rule it will be
>> re-evaluated against the ruleset when it enters the next connection
>> regardless of whether or not a BucketPrioritizer was set on that
>> connection. Also FlowFiles that have yet to be evaluated (have yet to
>> encounter a BucketPrioritizer) will not be staggered. This was a design
>> decision that if we don't know what a priority is for a given FlowFile we
>> should get it to that point in the flow as soon as possible. This decision
>> was a result of emperical evidence that when we did stagger unevaluated
>> data an incoming flow of high priority data slowed its own upstream
>> processing down once it was identified and processed as high priority.
>> >
>> > Multi-tenancy: Agreed that a global priority list could be too
>> restrictive for multi-tenancy and should be addressed.
>> >
>> > Per swapping, this is an area where I admittedly need to put more
>> thought into my implementation because there is plenty of room for
>> improvement. Right now I'm just swapping files to disk in order of least to
>> greatest priority but they are all stored together. Therefore they're read
>> back into memory in order of least to greatest priority. More work should
>> be done here.
>> >
>> >  - Jon
>> > ________________________________
>> > From: Joe Witt <joe.witt@gmail.com <ma...@gmail.com>>
>> > Sent: Thursday, October 17, 2019 8:12:52 AM
>> > To: dev@nifi.apache.org <ma...@nifi.apache.org>
>> > Subject: EXT: Re: [Discuss] Data prioritization - A proposed solution
>> >
>> > Jon
>> >
>> > Probably some details I don't quite understand yet so responses here
>> are to
>> > get there...
>> >
>> > The concept for scheduling is interesting.  Does this basically work
>> around
>> > the fact that we have an unfair scheduler so this has queue
>> implementations
>> > which pretend data is not available when it knows that there is higher
>> > priority data available elsewhere thus returning more threads to the
>> pool
>> > faster to increase the likelihood that queues with higher priority data
>> > will get served more often?
>> >
>> > The notion of prioritization implies there is a sorting function
>> happening
>> > somewhere.  NiFi now does sorting on insertion to every queue.  At what
>> > points are you suggesting sorting can be done/reduced to?
>> >
>> > Administration: The existing model does require each prioritizer to be
>> set
>> > for each queue.  Yours does as well - to opt into this you'd have to
>> select
>> > the BucketPrioritizer right?  It seems like you're saying the priority
>> > ruleset would be provided at the flow controller level and be enforced
>> by
>> > all connections which leverage this prioritizer.  For large multi-tenant
>> > nifi flows having a global ruleset might be too limiting but maybe we
>> just
>> > dont worry about that yet.
>> >
>> > How does this idea work with the fact that queues as the reach a given
>> > threshold have their data swapped out to disk and as data gets worked
>> off
>> > the flowfiles get swapped back into memory?
>> >
>> > Thanks
>> > Joe
>> >
>> > On Thu, Oct 17, 2019 at 7:28 AM Kessler, Jon <Ke...@varentech.com>
>> wrote:
>> >
>> >> I want to start a discussion about a new prioritization mechanism that
>> >> addresses some of the issues that I believe exist in the current
>> solution.
>> >> These issues are:
>> >>
>> >> - Scheduling: No consideration is given to data priority when
>> determining
>> >> which component is given the next available thread with which to work
>> >> - Constant sorting: Because all flowfiles in a given connection share
>> the
>> >> same PriorityQueue they must be sorted every time they move. While this
>> >> sort is efficient it can add up as queues grow deep.
>> >> - Administration: There is a costly human element to managing the value
>> >> used as a priority ranking as priorities change. You must also ensure
>> every
>> >> connection in the appropriate flow has the proper prioritizer assigned
>> to
>> >> it to make use of the property.
>> >>
>> >> We have developed a prototype of a new FlowFileQueue implementation
>> that
>> >> addresses these issues. Use of this implementation is controlled via
>> >>
>> https://linkprotect.cudasvc.com/url?a=https%3a%2f%2fnifi.properties&c=E,1,D-6vNtOC1armix171OZD90B4R0R1UHhG0ngPYawBEOljpSN5kOIfQ8lO9bTcExg_yuJivgzihwwJb6bhpftjXpqwjEl6UEcrh5miqpS_EFIvMdizFFNG&typo=1
>> <
>> https://linkprotect.cudasvc.com/url?a=https%3a%2f%2fnifi.properties&c=E,1,D-6vNtOC1armix171OZD90B4R0R1UHhG0ngPYawBEOljpSN5kOIfQ8lO9bTcExg_yuJivgzihwwJb6bhpftjXpqwjEl6UEcrh5miqpS_EFIvMdizFFNG&typo=1>
>> so you can opt-in or out system-wide without doing a lot of
>> >> configuration. Its design goals are:
>> >>
>> >>  - Instead of using the value of a FlowFile attribute as a ranking,
>> >> maintain a set of expression language rules to define your priorities.
>> The
>> >> highest ranked rule that a given FlowFile satisfies will be that
>> FlowFile's
>> >> priority
>> >>  - Because we have a finite set of priority rules we can utilize a
>> bucket
>> >> sort in our connections. One bucket per priority rule. The bucket/rule
>> with
>> >> which a FlowFile is associated with will be maintained so that as it
>> moves
>> >> through the system we do not have to re-evaluate that Flowfile against
>> our
>> >> ruleset unless we have reason to do so.
>> >>  - Control where in your flow FlowFiles are evaluated against the
>> ruleset
>> >> with a new Prioritizer implementation: BucketPrioritizer.
>> >>  - When this queue implementation is polled it will be able to check
>> >> state to see if any data of a higher priority than what it currently
>> >> contains recently (within 5s) moved elsewhere in the system. If higher
>> >> priority data has recently moved elsewhere, the connection will only
>> >> provide a FlowFile X% of the time where X is defined along with the
>> rule.
>> >> This allows higher priority data to have more frequent access to
>> threads
>> >> without thread-starving lower priority data.
>> >>  - Rules will be managed via a menu option for the flow and changes to
>> >> them take effect instantly. This allows you to change your priorities
>> >> without stopping/editing/restarting various components on the graph.
>> >>
>> >> I intend to contribute this solution but first want to solicit input
>> and
>> >> opinions.
>> >>
>> >>  - Jon Kessler
>>
>>

Re: Re: [Discuss] Data prioritization - A proposed solution

Posted by Joe Witt <jo...@gmail.com>.
I should clarify a bit....It is a perfectly fine thing to have an
alternative queue implementation which will have awareness that other high
priority data exists elsewhere and thus will intentionally respond as if
data is not available on a given queue so that other queues with higher
priority data will be serviced first.  This will be helpful for ensuring
highest priority data gets worked first.

My concerns relating to FBP purity as Andy brought up are important
considerations but they would not necessarily block the inclusion of such
an implementation.  We need to see it and talk through it for its own sake
- not FBPs.

At this stage I think you see there are plenty of folks interested in what
this could mean.  It is clearly touching on something that could benefit
from new ideas/improvement.  Hopefully you choose to make a PR/JIRA for it
as that is probably the next best step.



On Fri, Oct 18, 2019 at 1:59 PM Joe Witt <jo...@gmail.com> wrote:

> ...I was of course thinking the same thing about the fact this really does
> violate the point of the flow based programming construct.  But I only
> think it is the implementation that violates this.  Not the idea.  The idea
> is that some portions of the flow should have higher priority than others
> for execution/queuing.  It is the implementation which does this by
> coupling logic of how a queue behaves with the fact that higher priority
> data lives elsewhere and it then artificially says 'no work right now go do
> something more important'.  If we stay focused on the spirit of the idea I
> think there is something really interesting here that is
> additive/beneficial to the FBP constructs.
>
> I am slightly emboldened here in that having spoken with J Paul Morrison
> about NiFi and FBP he felt like we had done some things which were
> additive/beneficial to FBP and felt we were more like Classical FBP than
> many other things even though we've not implemented named ports for
> components (yet).
>
> That said, I do also feel like the real core need Jon's idea addresses is
> possibly better served by it being easier to deploy resource constrained
> NiFi Clusters or having things like stateless-nifi as an executor.
>
> In any event - this is a good discussion area and I'm looking forward to
> seeing how this evolves.
>
>
>
> On Fri, Oct 18, 2019 at 1:50 PM Andy LoPresto <al...@apache.org>
> wrote:
>
>> This is an interesting idea to solve an admitted problem, but I wonder
>> how it comports with the core tenets of Flow Based Programming on which
>> NiFi is modeled. This seems to introduce globally-coupled dependencies
>> between all queues in a flow, where another solution (flow segment-based
>> resource allocation) might solve this problem without requiring per-queue
>> contention on every cycle. I think with the stateless NiFi work there has
>> been some discussion around being able to control the resource allocation
>> for a flow segment. Sam Hjemfelt, any thoughts here?
>>
>>
>> Andy LoPresto
>> alopresto@apache.org
>> alopresto.apache@gmail.com
>> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69
>>
>> > On Oct 17, 2019, at 8:54 AM, Kessler, Jon <Ke...@varentech.com>
>> wrote:
>> >
>> > Joe, hopefully I addressed all of your questions:
>> >
>> > Your interpretation of the scheduling aspect is correct. These queues
>> will pretend to be empty a certain % of the time if higher priority data
>> recently moved elsewhere. That % is configurable on a per rule basis which
>> allows the operator to determine how much to stagger the data associated
>> with each rule. That % is also how the rules are ranked in terms of order
>> of priority. The higher the %, the more often a rule will make use of its
>> threads so the higher its priority is considered to be.
>> >
>> > Administration: You are correct that the ruleset is provided at the
>> flow controller level but will be leveraged by all connections regardless
>> of whether or not they use the BucketPrioritizer (more details on this
>> below). This overall solution only works if all FlowFileQueues are of this
>> new implementation which is why I tied it to nifi.properties settings.
>> >
>> > The sorting function here takes place on insertion into any connection
>> on which a BucketPrioritizer is set. Once a FlowFile has been sorted into a
>> bucket we maintain that state so that each time it moves into a new
>> connection we already know in which bucket it should be placed without
>> needing to have a BucketPrioritizer set on that connection. Each bucket in
>> a connection is just a FIFO queue so no additional sorting is done. You
>> should only have to configure connections to use the BucketPrioritizer at
>> points in your flow where you believe you'll have enough information to
>> accurately determine priority but not beyond that point unless you want to
>> re-evaluate downstream for some reason. There is administration involved in
>> setting these BucketPrioritizers on some connections but it should be
>> minimal per flow (sometimes as few as one).
>> >
>> > Some additional information: When you delete a rule the next time each
>> FlowFile moves that was already associated with that rule it will be
>> re-evaluated against the ruleset when it enters the next connection
>> regardless of whether or not a BucketPrioritizer was set on that
>> connection. Also FlowFiles that have yet to be evaluated (have yet to
>> encounter a BucketPrioritizer) will not be staggered. This was a design
>> decision that if we don't know what a priority is for a given FlowFile we
>> should get it to that point in the flow as soon as possible. This decision
>> was a result of emperical evidence that when we did stagger unevaluated
>> data an incoming flow of high priority data slowed its own upstream
>> processing down once it was identified and processed as high priority.
>> >
>> > Multi-tenancy: Agreed that a global priority list could be too
>> restrictive for multi-tenancy and should be addressed.
>> >
>> > Per swapping, this is an area where I admittedly need to put more
>> thought into my implementation because there is plenty of room for
>> improvement. Right now I'm just swapping files to disk in order of least to
>> greatest priority but they are all stored together. Therefore they're read
>> back into memory in order of least to greatest priority. More work should
>> be done here.
>> >
>> >  - Jon
>> > ________________________________
>> > From: Joe Witt <joe.witt@gmail.com <ma...@gmail.com>>
>> > Sent: Thursday, October 17, 2019 8:12:52 AM
>> > To: dev@nifi.apache.org <ma...@nifi.apache.org>
>> > Subject: EXT: Re: [Discuss] Data prioritization - A proposed solution
>> >
>> > Jon
>> >
>> > Probably some details I don't quite understand yet so responses here
>> are to
>> > get there...
>> >
>> > The concept for scheduling is interesting.  Does this basically work
>> around
>> > the fact that we have an unfair scheduler so this has queue
>> implementations
>> > which pretend data is not available when it knows that there is higher
>> > priority data available elsewhere thus returning more threads to the
>> pool
>> > faster to increase the likelihood that queues with higher priority data
>> > will get served more often?
>> >
>> > The notion of prioritization implies there is a sorting function
>> happening
>> > somewhere.  NiFi now does sorting on insertion to every queue.  At what
>> > points are you suggesting sorting can be done/reduced to?
>> >
>> > Administration: The existing model does require each prioritizer to be
>> set
>> > for each queue.  Yours does as well - to opt into this you'd have to
>> select
>> > the BucketPrioritizer right?  It seems like you're saying the priority
>> > ruleset would be provided at the flow controller level and be enforced
>> by
>> > all connections which leverage this prioritizer.  For large multi-tenant
>> > nifi flows having a global ruleset might be too limiting but maybe we
>> just
>> > dont worry about that yet.
>> >
>> > How does this idea work with the fact that queues as the reach a given
>> > threshold have their data swapped out to disk and as data gets worked
>> off
>> > the flowfiles get swapped back into memory?
>> >
>> > Thanks
>> > Joe
>> >
>> > On Thu, Oct 17, 2019 at 7:28 AM Kessler, Jon <Ke...@varentech.com>
>> wrote:
>> >
>> >> I want to start a discussion about a new prioritization mechanism that
>> >> addresses some of the issues that I believe exist in the current
>> solution.
>> >> These issues are:
>> >>
>> >> - Scheduling: No consideration is given to data priority when
>> determining
>> >> which component is given the next available thread with which to work
>> >> - Constant sorting: Because all flowfiles in a given connection share
>> the
>> >> same PriorityQueue they must be sorted every time they move. While this
>> >> sort is efficient it can add up as queues grow deep.
>> >> - Administration: There is a costly human element to managing the value
>> >> used as a priority ranking as priorities change. You must also ensure
>> every
>> >> connection in the appropriate flow has the proper prioritizer assigned
>> to
>> >> it to make use of the property.
>> >>
>> >> We have developed a prototype of a new FlowFileQueue implementation
>> that
>> >> addresses these issues. Use of this implementation is controlled via
>> >>
>> https://linkprotect.cudasvc.com/url?a=https%3a%2f%2fnifi.properties&c=E,1,D-6vNtOC1armix171OZD90B4R0R1UHhG0ngPYawBEOljpSN5kOIfQ8lO9bTcExg_yuJivgzihwwJb6bhpftjXpqwjEl6UEcrh5miqpS_EFIvMdizFFNG&typo=1
>> <
>> https://linkprotect.cudasvc.com/url?a=https%3a%2f%2fnifi.properties&c=E,1,D-6vNtOC1armix171OZD90B4R0R1UHhG0ngPYawBEOljpSN5kOIfQ8lO9bTcExg_yuJivgzihwwJb6bhpftjXpqwjEl6UEcrh5miqpS_EFIvMdizFFNG&typo=1>
>> so you can opt-in or out system-wide without doing a lot of
>> >> configuration. Its design goals are:
>> >>
>> >>  - Instead of using the value of a FlowFile attribute as a ranking,
>> >> maintain a set of expression language rules to define your priorities.
>> The
>> >> highest ranked rule that a given FlowFile satisfies will be that
>> FlowFile's
>> >> priority
>> >>  - Because we have a finite set of priority rules we can utilize a
>> bucket
>> >> sort in our connections. One bucket per priority rule. The bucket/rule
>> with
>> >> which a FlowFile is associated with will be maintained so that as it
>> moves
>> >> through the system we do not have to re-evaluate that Flowfile against
>> our
>> >> ruleset unless we have reason to do so.
>> >>  - Control where in your flow FlowFiles are evaluated against the
>> ruleset
>> >> with a new Prioritizer implementation: BucketPrioritizer.
>> >>  - When this queue implementation is polled it will be able to check
>> >> state to see if any data of a higher priority than what it currently
>> >> contains recently (within 5s) moved elsewhere in the system. If higher
>> >> priority data has recently moved elsewhere, the connection will only
>> >> provide a FlowFile X% of the time where X is defined along with the
>> rule.
>> >> This allows higher priority data to have more frequent access to
>> threads
>> >> without thread-starving lower priority data.
>> >>  - Rules will be managed via a menu option for the flow and changes to
>> >> them take effect instantly. This allows you to change your priorities
>> >> without stopping/editing/restarting various components on the graph.
>> >>
>> >> I intend to contribute this solution but first want to solicit input
>> and
>> >> opinions.
>> >>
>> >>  - Jon Kessler
>>
>>

Re: Re: [Discuss] Data prioritization - A proposed solution

Posted by Joe Witt <jo...@gmail.com>.
...I was of course thinking the same thing about the fact this really does
violate the point of the flow based programming construct.  But I only
think it is the implementation that violates this.  Not the idea.  The idea
is that some portions of the flow should have higher priority than others
for execution/queuing.  It is the implementation which does this by
coupling logic of how a queue behaves with the fact that higher priority
data lives elsewhere and it then artificially says 'no work right now go do
something more important'.  If we stay focused on the spirit of the idea I
think there is something really interesting here that is
additive/beneficial to the FBP constructs.

I am slightly emboldened here in that having spoken with J Paul Morrison
about NiFi and FBP he felt like we had done some things which were
additive/beneficial to FBP and felt we were more like Classical FBP than
many other things even though we've not implemented named ports for
components (yet).

That said, I do also feel like the real core need Jon's idea addresses is
possibly better served by it being easier to deploy resource constrained
NiFi Clusters or having things like stateless-nifi as an executor.

In any event - this is a good discussion area and I'm looking forward to
seeing how this evolves.



On Fri, Oct 18, 2019 at 1:50 PM Andy LoPresto <al...@apache.org> wrote:

> This is an interesting idea to solve an admitted problem, but I wonder how
> it comports with the core tenets of Flow Based Programming on which NiFi is
> modeled. This seems to introduce globally-coupled dependencies between all
> queues in a flow, where another solution (flow segment-based resource
> allocation) might solve this problem without requiring per-queue contention
> on every cycle. I think with the stateless NiFi work there has been some
> discussion around being able to control the resource allocation for a flow
> segment. Sam Hjemfelt, any thoughts here?
>
>
> Andy LoPresto
> alopresto@apache.org
> alopresto.apache@gmail.com
> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69
>
> > On Oct 17, 2019, at 8:54 AM, Kessler, Jon <Ke...@varentech.com>
> wrote:
> >
> > Joe, hopefully I addressed all of your questions:
> >
> > Your interpretation of the scheduling aspect is correct. These queues
> will pretend to be empty a certain % of the time if higher priority data
> recently moved elsewhere. That % is configurable on a per rule basis which
> allows the operator to determine how much to stagger the data associated
> with each rule. That % is also how the rules are ranked in terms of order
> of priority. The higher the %, the more often a rule will make use of its
> threads so the higher its priority is considered to be.
> >
> > Administration: You are correct that the ruleset is provided at the flow
> controller level but will be leveraged by all connections regardless of
> whether or not they use the BucketPrioritizer (more details on this below).
> This overall solution only works if all FlowFileQueues are of this new
> implementation which is why I tied it to nifi.properties settings.
> >
> > The sorting function here takes place on insertion into any connection
> on which a BucketPrioritizer is set. Once a FlowFile has been sorted into a
> bucket we maintain that state so that each time it moves into a new
> connection we already know in which bucket it should be placed without
> needing to have a BucketPrioritizer set on that connection. Each bucket in
> a connection is just a FIFO queue so no additional sorting is done. You
> should only have to configure connections to use the BucketPrioritizer at
> points in your flow where you believe you'll have enough information to
> accurately determine priority but not beyond that point unless you want to
> re-evaluate downstream for some reason. There is administration involved in
> setting these BucketPrioritizers on some connections but it should be
> minimal per flow (sometimes as few as one).
> >
> > Some additional information: When you delete a rule the next time each
> FlowFile moves that was already associated with that rule it will be
> re-evaluated against the ruleset when it enters the next connection
> regardless of whether or not a BucketPrioritizer was set on that
> connection. Also FlowFiles that have yet to be evaluated (have yet to
> encounter a BucketPrioritizer) will not be staggered. This was a design
> decision that if we don't know what a priority is for a given FlowFile we
> should get it to that point in the flow as soon as possible. This decision
> was a result of emperical evidence that when we did stagger unevaluated
> data an incoming flow of high priority data slowed its own upstream
> processing down once it was identified and processed as high priority.
> >
> > Multi-tenancy: Agreed that a global priority list could be too
> restrictive for multi-tenancy and should be addressed.
> >
> > Per swapping, this is an area where I admittedly need to put more
> thought into my implementation because there is plenty of room for
> improvement. Right now I'm just swapping files to disk in order of least to
> greatest priority but they are all stored together. Therefore they're read
> back into memory in order of least to greatest priority. More work should
> be done here.
> >
> >  - Jon
> > ________________________________
> > From: Joe Witt <joe.witt@gmail.com <ma...@gmail.com>>
> > Sent: Thursday, October 17, 2019 8:12:52 AM
> > To: dev@nifi.apache.org <ma...@nifi.apache.org>
> > Subject: EXT: Re: [Discuss] Data prioritization - A proposed solution
> >
> > Jon
> >
> > Probably some details I don't quite understand yet so responses here are
> to
> > get there...
> >
> > The concept for scheduling is interesting.  Does this basically work
> around
> > the fact that we have an unfair scheduler so this has queue
> implementations
> > which pretend data is not available when it knows that there is higher
> > priority data available elsewhere thus returning more threads to the pool
> > faster to increase the likelihood that queues with higher priority data
> > will get served more often?
> >
> > The notion of prioritization implies there is a sorting function
> happening
> > somewhere.  NiFi now does sorting on insertion to every queue.  At what
> > points are you suggesting sorting can be done/reduced to?
> >
> > Administration: The existing model does require each prioritizer to be
> set
> > for each queue.  Yours does as well - to opt into this you'd have to
> select
> > the BucketPrioritizer right?  It seems like you're saying the priority
> > ruleset would be provided at the flow controller level and be enforced by
> > all connections which leverage this prioritizer.  For large multi-tenant
> > nifi flows having a global ruleset might be too limiting but maybe we
> just
> > dont worry about that yet.
> >
> > How does this idea work with the fact that queues as the reach a given
> > threshold have their data swapped out to disk and as data gets worked off
> > the flowfiles get swapped back into memory?
> >
> > Thanks
> > Joe
> >
> > On Thu, Oct 17, 2019 at 7:28 AM Kessler, Jon <Ke...@varentech.com>
> wrote:
> >
> >> I want to start a discussion about a new prioritization mechanism that
> >> addresses some of the issues that I believe exist in the current
> solution.
> >> These issues are:
> >>
> >> - Scheduling: No consideration is given to data priority when
> determining
> >> which component is given the next available thread with which to work
> >> - Constant sorting: Because all flowfiles in a given connection share
> the
> >> same PriorityQueue they must be sorted every time they move. While this
> >> sort is efficient it can add up as queues grow deep.
> >> - Administration: There is a costly human element to managing the value
> >> used as a priority ranking as priorities change. You must also ensure
> every
> >> connection in the appropriate flow has the proper prioritizer assigned
> to
> >> it to make use of the property.
> >>
> >> We have developed a prototype of a new FlowFileQueue implementation that
> >> addresses these issues. Use of this implementation is controlled via
> >>
> https://linkprotect.cudasvc.com/url?a=https%3a%2f%2fnifi.properties&c=E,1,D-6vNtOC1armix171OZD90B4R0R1UHhG0ngPYawBEOljpSN5kOIfQ8lO9bTcExg_yuJivgzihwwJb6bhpftjXpqwjEl6UEcrh5miqpS_EFIvMdizFFNG&typo=1
> <
> https://linkprotect.cudasvc.com/url?a=https%3a%2f%2fnifi.properties&c=E,1,D-6vNtOC1armix171OZD90B4R0R1UHhG0ngPYawBEOljpSN5kOIfQ8lO9bTcExg_yuJivgzihwwJb6bhpftjXpqwjEl6UEcrh5miqpS_EFIvMdizFFNG&typo=1>
> so you can opt-in or out system-wide without doing a lot of
> >> configuration. Its design goals are:
> >>
> >>  - Instead of using the value of a FlowFile attribute as a ranking,
> >> maintain a set of expression language rules to define your priorities.
> The
> >> highest ranked rule that a given FlowFile satisfies will be that
> FlowFile's
> >> priority
> >>  - Because we have a finite set of priority rules we can utilize a
> bucket
> >> sort in our connections. One bucket per priority rule. The bucket/rule
> with
> >> which a FlowFile is associated with will be maintained so that as it
> moves
> >> through the system we do not have to re-evaluate that Flowfile against
> our
> >> ruleset unless we have reason to do so.
> >>  - Control where in your flow FlowFiles are evaluated against the
> ruleset
> >> with a new Prioritizer implementation: BucketPrioritizer.
> >>  - When this queue implementation is polled it will be able to check
> >> state to see if any data of a higher priority than what it currently
> >> contains recently (within 5s) moved elsewhere in the system. If higher
> >> priority data has recently moved elsewhere, the connection will only
> >> provide a FlowFile X% of the time where X is defined along with the
> rule.
> >> This allows higher priority data to have more frequent access to threads
> >> without thread-starving lower priority data.
> >>  - Rules will be managed via a menu option for the flow and changes to
> >> them take effect instantly. This allows you to change your priorities
> >> without stopping/editing/restarting various components on the graph.
> >>
> >> I intend to contribute this solution but first want to solicit input and
> >> opinions.
> >>
> >>  - Jon Kessler
>
>

Re: Re: [Discuss] Data prioritization - A proposed solution

Posted by Andy LoPresto <al...@apache.org>.
This is an interesting idea to solve an admitted problem, but I wonder how it comports with the core tenets of Flow Based Programming on which NiFi is modeled. This seems to introduce globally-coupled dependencies between all queues in a flow, where another solution (flow segment-based resource allocation) might solve this problem without requiring per-queue contention on every cycle. I think with the stateless NiFi work there has been some discussion around being able to control the resource allocation for a flow segment. Sam Hjemfelt, any thoughts here?


Andy LoPresto
alopresto@apache.org
alopresto.apache@gmail.com
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69

> On Oct 17, 2019, at 8:54 AM, Kessler, Jon <Ke...@varentech.com> wrote:
> 
> Joe, hopefully I addressed all of your questions:
> 
> Your interpretation of the scheduling aspect is correct. These queues will pretend to be empty a certain % of the time if higher priority data recently moved elsewhere. That % is configurable on a per rule basis which allows the operator to determine how much to stagger the data associated with each rule. That % is also how the rules are ranked in terms of order of priority. The higher the %, the more often a rule will make use of its threads so the higher its priority is considered to be.
> 
> Administration: You are correct that the ruleset is provided at the flow controller level but will be leveraged by all connections regardless of whether or not they use the BucketPrioritizer (more details on this below). This overall solution only works if all FlowFileQueues are of this new implementation which is why I tied it to nifi.properties settings.
> 
> The sorting function here takes place on insertion into any connection on which a BucketPrioritizer is set. Once a FlowFile has been sorted into a bucket we maintain that state so that each time it moves into a new connection we already know in which bucket it should be placed without needing to have a BucketPrioritizer set on that connection. Each bucket in a connection is just a FIFO queue so no additional sorting is done. You should only have to configure connections to use the BucketPrioritizer at points in your flow where you believe you'll have enough information to accurately determine priority but not beyond that point unless you want to re-evaluate downstream for some reason. There is administration involved in setting these BucketPrioritizers on some connections but it should be minimal per flow (sometimes as few as one).
> 
> Some additional information: When you delete a rule the next time each FlowFile moves that was already associated with that rule it will be re-evaluated against the ruleset when it enters the next connection regardless of whether or not a BucketPrioritizer was set on that connection. Also FlowFiles that have yet to be evaluated (have yet to encounter a BucketPrioritizer) will not be staggered. This was a design decision that if we don't know what a priority is for a given FlowFile we should get it to that point in the flow as soon as possible. This decision was a result of emperical evidence that when we did stagger unevaluated data an incoming flow of high priority data slowed its own upstream processing down once it was identified and processed as high priority.
> 
> Multi-tenancy: Agreed that a global priority list could be too restrictive for multi-tenancy and should be addressed.
> 
> Per swapping, this is an area where I admittedly need to put more thought into my implementation because there is plenty of room for improvement. Right now I'm just swapping files to disk in order of least to greatest priority but they are all stored together. Therefore they're read back into memory in order of least to greatest priority. More work should be done here.
> 
>  - Jon
> ________________________________
> From: Joe Witt <joe.witt@gmail.com <ma...@gmail.com>>
> Sent: Thursday, October 17, 2019 8:12:52 AM
> To: dev@nifi.apache.org <ma...@nifi.apache.org>
> Subject: EXT: Re: [Discuss] Data prioritization - A proposed solution
> 
> Jon
> 
> Probably some details I don't quite understand yet so responses here are to
> get there...
> 
> The concept for scheduling is interesting.  Does this basically work around
> the fact that we have an unfair scheduler so this has queue implementations
> which pretend data is not available when it knows that there is higher
> priority data available elsewhere thus returning more threads to the pool
> faster to increase the likelihood that queues with higher priority data
> will get served more often?
> 
> The notion of prioritization implies there is a sorting function happening
> somewhere.  NiFi now does sorting on insertion to every queue.  At what
> points are you suggesting sorting can be done/reduced to?
> 
> Administration: The existing model does require each prioritizer to be set
> for each queue.  Yours does as well - to opt into this you'd have to select
> the BucketPrioritizer right?  It seems like you're saying the priority
> ruleset would be provided at the flow controller level and be enforced by
> all connections which leverage this prioritizer.  For large multi-tenant
> nifi flows having a global ruleset might be too limiting but maybe we just
> dont worry about that yet.
> 
> How does this idea work with the fact that queues as the reach a given
> threshold have their data swapped out to disk and as data gets worked off
> the flowfiles get swapped back into memory?
> 
> Thanks
> Joe
> 
> On Thu, Oct 17, 2019 at 7:28 AM Kessler, Jon <Ke...@varentech.com> wrote:
> 
>> I want to start a discussion about a new prioritization mechanism that
>> addresses some of the issues that I believe exist in the current solution.
>> These issues are:
>> 
>> - Scheduling: No consideration is given to data priority when determining
>> which component is given the next available thread with which to work
>> - Constant sorting: Because all flowfiles in a given connection share the
>> same PriorityQueue they must be sorted every time they move. While this
>> sort is efficient it can add up as queues grow deep.
>> - Administration: There is a costly human element to managing the value
>> used as a priority ranking as priorities change. You must also ensure every
>> connection in the appropriate flow has the proper prioritizer assigned to
>> it to make use of the property.
>> 
>> We have developed a prototype of a new FlowFileQueue implementation that
>> addresses these issues. Use of this implementation is controlled via
>> https://linkprotect.cudasvc.com/url?a=https%3a%2f%2fnifi.properties&c=E,1,D-6vNtOC1armix171OZD90B4R0R1UHhG0ngPYawBEOljpSN5kOIfQ8lO9bTcExg_yuJivgzihwwJb6bhpftjXpqwjEl6UEcrh5miqpS_EFIvMdizFFNG&typo=1 <https://linkprotect.cudasvc.com/url?a=https%3a%2f%2fnifi.properties&c=E,1,D-6vNtOC1armix171OZD90B4R0R1UHhG0ngPYawBEOljpSN5kOIfQ8lO9bTcExg_yuJivgzihwwJb6bhpftjXpqwjEl6UEcrh5miqpS_EFIvMdizFFNG&typo=1> so you can opt-in or out system-wide without doing a lot of
>> configuration. Its design goals are:
>> 
>>  - Instead of using the value of a FlowFile attribute as a ranking,
>> maintain a set of expression language rules to define your priorities. The
>> highest ranked rule that a given FlowFile satisfies will be that FlowFile's
>> priority
>>  - Because we have a finite set of priority rules we can utilize a bucket
>> sort in our connections. One bucket per priority rule. The bucket/rule with
>> which a FlowFile is associated with will be maintained so that as it moves
>> through the system we do not have to re-evaluate that Flowfile against our
>> ruleset unless we have reason to do so.
>>  - Control where in your flow FlowFiles are evaluated against the ruleset
>> with a new Prioritizer implementation: BucketPrioritizer.
>>  - When this queue implementation is polled it will be able to check
>> state to see if any data of a higher priority than what it currently
>> contains recently (within 5s) moved elsewhere in the system. If higher
>> priority data has recently moved elsewhere, the connection will only
>> provide a FlowFile X% of the time where X is defined along with the rule.
>> This allows higher priority data to have more frequent access to threads
>> without thread-starving lower priority data.
>>  - Rules will be managed via a menu option for the flow and changes to
>> them take effect instantly. This allows you to change your priorities
>> without stopping/editing/restarting various components on the graph.
>> 
>> I intend to contribute this solution but first want to solicit input and
>> opinions.
>> 
>>  - Jon Kessler


Re: Re: [Discuss] Data prioritization - A proposed solution

Posted by "Kessler, Jon" <Ke...@varentech.com>.
Joe, hopefully I addressed all of your questions:

Your interpretation of the scheduling aspect is correct. These queues will pretend to be empty a certain % of the time if higher priority data recently moved elsewhere. That % is configurable on a per rule basis which allows the operator to determine how much to stagger the data associated with each rule. That % is also how the rules are ranked in terms of order of priority. The higher the %, the more often a rule will make use of its threads so the higher its priority is considered to be.

Administration: You are correct that the ruleset is provided at the flow controller level but will be leveraged by all connections regardless of whether or not they use the BucketPrioritizer (more details on this below). This overall solution only works if all FlowFileQueues are of this new implementation which is why I tied it to nifi.properties settings.

The sorting function here takes place on insertion into any connection on which a BucketPrioritizer is set. Once a FlowFile has been sorted into a bucket we maintain that state so that each time it moves into a new connection we already know in which bucket it should be placed without needing to have a BucketPrioritizer set on that connection. Each bucket in a connection is just a FIFO queue so no additional sorting is done. You should only have to configure connections to use the BucketPrioritizer at points in your flow where you believe you'll have enough information to accurately determine priority but not beyond that point unless you want to re-evaluate downstream for some reason. There is administration involved in setting these BucketPrioritizers on some connections but it should be minimal per flow (sometimes as few as one).

Some additional information: When you delete a rule the next time each FlowFile moves that was already associated with that rule it will be re-evaluated against the ruleset when it enters the next connection regardless of whether or not a BucketPrioritizer was set on that connection. Also FlowFiles that have yet to be evaluated (have yet to encounter a BucketPrioritizer) will not be staggered. This was a design decision that if we don't know what a priority is for a given FlowFile we should get it to that point in the flow as soon as possible. This decision was a result of emperical evidence that when we did stagger unevaluated data an incoming flow of high priority data slowed its own upstream processing down once it was identified and processed as high priority.

Multi-tenancy: Agreed that a global priority list could be too restrictive for multi-tenancy and should be addressed.

Per swapping, this is an area where I admittedly need to put more thought into my implementation because there is plenty of room for improvement. Right now I'm just swapping files to disk in order of least to greatest priority but they are all stored together. Therefore they're read back into memory in order of least to greatest priority. More work should be done here.

  - Jon
________________________________
From: Joe Witt <jo...@gmail.com>
Sent: Thursday, October 17, 2019 8:12:52 AM
To: dev@nifi.apache.org
Subject: EXT: Re: [Discuss] Data prioritization - A proposed solution

Jon

Probably some details I don't quite understand yet so responses here are to
get there...

The concept for scheduling is interesting.  Does this basically work around
the fact that we have an unfair scheduler so this has queue implementations
which pretend data is not available when it knows that there is higher
priority data available elsewhere thus returning more threads to the pool
faster to increase the likelihood that queues with higher priority data
will get served more often?

The notion of prioritization implies there is a sorting function happening
somewhere.  NiFi now does sorting on insertion to every queue.  At what
points are you suggesting sorting can be done/reduced to?

Administration: The existing model does require each prioritizer to be set
for each queue.  Yours does as well - to opt into this you'd have to select
the BucketPrioritizer right?  It seems like you're saying the priority
ruleset would be provided at the flow controller level and be enforced by
all connections which leverage this prioritizer.  For large multi-tenant
nifi flows having a global ruleset might be too limiting but maybe we just
dont worry about that yet.

How does this idea work with the fact that queues as the reach a given
threshold have their data swapped out to disk and as data gets worked off
the flowfiles get swapped back into memory?

Thanks
Joe

On Thu, Oct 17, 2019 at 7:28 AM Kessler, Jon <Ke...@varentech.com> wrote:

> I want to start a discussion about a new prioritization mechanism that
> addresses some of the issues that I believe exist in the current solution.
> These issues are:
>
>  - Scheduling: No consideration is given to data priority when determining
> which component is given the next available thread with which to work
>  - Constant sorting: Because all flowfiles in a given connection share the
> same PriorityQueue they must be sorted every time they move. While this
> sort is efficient it can add up as queues grow deep.
>  - Administration: There is a costly human element to managing the value
> used as a priority ranking as priorities change. You must also ensure every
> connection in the appropriate flow has the proper prioritizer assigned to
> it to make use of the property.
>
> We have developed a prototype of a new FlowFileQueue implementation that
> addresses these issues. Use of this implementation is controlled via
> https://linkprotect.cudasvc.com/url?a=https%3a%2f%2fnifi.properties&c=E,1,D-6vNtOC1armix171OZD90B4R0R1UHhG0ngPYawBEOljpSN5kOIfQ8lO9bTcExg_yuJivgzihwwJb6bhpftjXpqwjEl6UEcrh5miqpS_EFIvMdizFFNG&typo=1 so you can opt-in or out system-wide without doing a lot of
> configuration. Its design goals are:
>
>   - Instead of using the value of a FlowFile attribute as a ranking,
> maintain a set of expression language rules to define your priorities. The
> highest ranked rule that a given FlowFile satisfies will be that FlowFile's
> priority
>   - Because we have a finite set of priority rules we can utilize a bucket
> sort in our connections. One bucket per priority rule. The bucket/rule with
> which a FlowFile is associated with will be maintained so that as it moves
> through the system we do not have to re-evaluate that Flowfile against our
> ruleset unless we have reason to do so.
>   - Control where in your flow FlowFiles are evaluated against the ruleset
> with a new Prioritizer implementation: BucketPrioritizer.
>   - When this queue implementation is polled it will be able to check
> state to see if any data of a higher priority than what it currently
> contains recently (within 5s) moved elsewhere in the system. If higher
> priority data has recently moved elsewhere, the connection will only
> provide a FlowFile X% of the time where X is defined along with the rule.
> This allows higher priority data to have more frequent access to threads
> without thread-starving lower priority data.
>   - Rules will be managed via a menu option for the flow and changes to
> them take effect instantly. This allows you to change your priorities
> without stopping/editing/restarting various components on the graph.
>
> I intend to contribute this solution but first want to solicit input and
> opinions.
>
>   - Jon Kessler
>
>

Re: [Discuss] Data prioritization - A proposed solution

Posted by Joe Witt <jo...@gmail.com>.
Jon

Probably some details I don't quite understand yet so responses here are to
get there...

The concept for scheduling is interesting.  Does this basically work around
the fact that we have an unfair scheduler so this has queue implementations
which pretend data is not available when it knows that there is higher
priority data available elsewhere thus returning more threads to the pool
faster to increase the likelihood that queues with higher priority data
will get served more often?

The notion of prioritization implies there is a sorting function happening
somewhere.  NiFi now does sorting on insertion to every queue.  At what
points are you suggesting sorting can be done/reduced to?

Administration: The existing model does require each prioritizer to be set
for each queue.  Yours does as well - to opt into this you'd have to select
the BucketPrioritizer right?  It seems like you're saying the priority
ruleset would be provided at the flow controller level and be enforced by
all connections which leverage this prioritizer.  For large multi-tenant
nifi flows having a global ruleset might be too limiting but maybe we just
dont worry about that yet.

How does this idea work with the fact that queues as the reach a given
threshold have their data swapped out to disk and as data gets worked off
the flowfiles get swapped back into memory?

Thanks
Joe

On Thu, Oct 17, 2019 at 7:28 AM Kessler, Jon <Ke...@varentech.com> wrote:

> I want to start a discussion about a new prioritization mechanism that
> addresses some of the issues that I believe exist in the current solution.
> These issues are:
>
>  - Scheduling: No consideration is given to data priority when determining
> which component is given the next available thread with which to work
>  - Constant sorting: Because all flowfiles in a given connection share the
> same PriorityQueue they must be sorted every time they move. While this
> sort is efficient it can add up as queues grow deep.
>  - Administration: There is a costly human element to managing the value
> used as a priority ranking as priorities change. You must also ensure every
> connection in the appropriate flow has the proper prioritizer assigned to
> it to make use of the property.
>
> We have developed a prototype of a new FlowFileQueue implementation that
> addresses these issues. Use of this implementation is controlled via
> nifi.properties so you can opt-in or out system-wide without doing a lot of
> configuration. Its design goals are:
>
>   - Instead of using the value of a FlowFile attribute as a ranking,
> maintain a set of expression language rules to define your priorities. The
> highest ranked rule that a given FlowFile satisfies will be that FlowFile's
> priority
>   - Because we have a finite set of priority rules we can utilize a bucket
> sort in our connections. One bucket per priority rule. The bucket/rule with
> which a FlowFile is associated with will be maintained so that as it moves
> through the system we do not have to re-evaluate that Flowfile against our
> ruleset unless we have reason to do so.
>   - Control where in your flow FlowFiles are evaluated against the ruleset
> with a new Prioritizer implementation: BucketPrioritizer.
>   - When this queue implementation is polled it will be able to check
> state to see if any data of a higher priority than what it currently
> contains recently (within 5s) moved elsewhere in the system. If higher
> priority data has recently moved elsewhere, the connection will only
> provide a FlowFile X% of the time where X is defined along with the rule.
> This allows higher priority data to have more frequent access to threads
> without thread-starving lower priority data.
>   - Rules will be managed via a menu option for the flow and changes to
> them take effect instantly. This allows you to change your priorities
> without stopping/editing/restarting various components on the graph.
>
> I intend to contribute this solution but first want to solicit input and
> opinions.
>
>   - Jon Kessler
>
>