You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by Ryan Hendrickson <ry...@gmail.com> on 2020/06/24 18:01:19 UTC

NiFi PriorityAttributePrioritizer doesn't prioritize all items in a queue.

Hello,
   We've noticed that the PriorityAttributePrioritizer doesn't prioritize
all items in a queue.

   We'll get a queue of 50,000 items in it, then a 50,001 item will be
added with a #1 priority.  This item won't be prioritized above lower
priority items in the queue.

   We did some research into this and wanted to confirm what we found...

    For any Relationship there are 2 underlying queues:
       (1) An Active Queue (java.util.PriorityQueue) for the first 20,000
items, defined by the nifi.queue.swap.threshold in nifi.properties.
       (2) A Swap Queue (java.util.ArrayList) for the rest of the
queue's items.

    If the Active Queue is full, every new item, regardless of priority, is
placed on the Swap Queue.  No item on the Swap Queue will be re-prioritized
until the entire Active Queue is empty (Line 284
<https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java#L284>).
Thus, the PriorityAttributePrioritize only "actively" sorts the first
20,000 items on the relationship.  Once the queue is empty, it'll re-sort
the rest and move the top 20,000 over.  Then the problem repeats.

    I hadn't read anywhere in the documentation that this is the case.
Even in the admin guide definition of the nifi.queue.swap.threshold it
doesn't mention that it affects the PriorityQueue system.  We just raised
it from 20,000 to 200,000 but we're all assuming that's going to have
detrimental effects elsewhere.

   If you guys could confirm our suspicions here, we'd appreciate that.
Also any suggestions on what to do here, or how an increased threshold size
negatively impacts the JVM.


Stack we followed to figure this out:

SwappablePriorityQueue.java: doPoll()
SwappablePriorityQueue.java: poll()
StandardFlowFileQueue.java: queue.poll()
StandardConnection.java flowFileQueue.poll()
StandardProcessSession.java: conn.poll()
InvokeHttp.java: session.get()

Thanks,
Ryan

Re: NiFi PriorityAttributePrioritizer doesn't prioritize all items in a queue.

Posted by Ryan Hendrickson <ry...@gmail.com>.
Hi Andy,
   Yea, it's the Apache NiFi 1.11.4 version.

Thanks,
Ryan

On Wed, Jun 24, 2020 at 7:06 PM Andy LoPresto <al...@apache.org> wrote:

> Hi Ryan,
>
> Thanks for writing such a detailed report. What version of NiFi are you
> observing this behavior against? I know there were some issues in older
> versions with queue swapping, but since you linked to current code, I’m
> assuming you’re experiencing this on 1.11.4?
>
>
> Andy LoPresto
> alopresto@apache.org
> alopresto.apache@gmail.com
> He/Him
> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69
>
> > On Jun 24, 2020, at 11:01 AM, Ryan Hendrickson <
> ryan.andrew.hendrickson@gmail.com> wrote:
> >
> > Hello,
> >   We've noticed that the PriorityAttributePrioritizer doesn't prioritize
> > all items in a queue.
> >
> >   We'll get a queue of 50,000 items in it, then a 50,001 item will be
> > added with a #1 priority.  This item won't be prioritized above lower
> > priority items in the queue.
> >
> >   We did some research into this and wanted to confirm what we found...
> >
> >    For any Relationship there are 2 underlying queues:
> >       (1) An Active Queue (java.util.PriorityQueue) for the first 20,000
> > items, defined by the nifi.queue.swap.threshold in nifi.properties.
> >       (2) A Swap Queue (java.util.ArrayList) for the rest of the
> > queue's items.
> >
> >    If the Active Queue is full, every new item, regardless of priority,
> is
> > placed on the Swap Queue.  No item on the Swap Queue will be
> re-prioritized
> > until the entire Active Queue is empty (Line 284
> > <
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java#L284
> >).
> > Thus, the PriorityAttributePrioritize only "actively" sorts the first
> > 20,000 items on the relationship.  Once the queue is empty, it'll re-sort
> > the rest and move the top 20,000 over.  Then the problem repeats.
> >
> >    I hadn't read anywhere in the documentation that this is the case.
> > Even in the admin guide definition of the nifi.queue.swap.threshold it
> > doesn't mention that it affects the PriorityQueue system.  We just raised
> > it from 20,000 to 200,000 but we're all assuming that's going to have
> > detrimental effects elsewhere.
> >
> >   If you guys could confirm our suspicions here, we'd appreciate that.
> > Also any suggestions on what to do here, or how an increased threshold
> size
> > negatively impacts the JVM.
> >
> >
> > Stack we followed to figure this out:
> >
> > SwappablePriorityQueue.java: doPoll()
> > SwappablePriorityQueue.java: poll()
> > StandardFlowFileQueue.java: queue.poll()
> > StandardConnection.java flowFileQueue.poll()
> > StandardProcessSession.java: conn.poll()
> > InvokeHttp.java: session.get()
> >
> > Thanks,
> > Ryan
>
>

Re: NiFi PriorityAttributePrioritizer doesn't prioritize all items in a queue.

Posted by Andy LoPresto <al...@apache.org>.
Hi Ryan,

Thanks for writing such a detailed report. What version of NiFi are you observing this behavior against? I know there were some issues in older versions with queue swapping, but since you linked to current code, I’m assuming you’re experiencing this on 1.11.4?


Andy LoPresto
alopresto@apache.org
alopresto.apache@gmail.com
He/Him
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69

> On Jun 24, 2020, at 11:01 AM, Ryan Hendrickson <ry...@gmail.com> wrote:
> 
> Hello,
>   We've noticed that the PriorityAttributePrioritizer doesn't prioritize
> all items in a queue.
> 
>   We'll get a queue of 50,000 items in it, then a 50,001 item will be
> added with a #1 priority.  This item won't be prioritized above lower
> priority items in the queue.
> 
>   We did some research into this and wanted to confirm what we found...
> 
>    For any Relationship there are 2 underlying queues:
>       (1) An Active Queue (java.util.PriorityQueue) for the first 20,000
> items, defined by the nifi.queue.swap.threshold in nifi.properties.
>       (2) A Swap Queue (java.util.ArrayList) for the rest of the
> queue's items.
> 
>    If the Active Queue is full, every new item, regardless of priority, is
> placed on the Swap Queue.  No item on the Swap Queue will be re-prioritized
> until the entire Active Queue is empty (Line 284
> <https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java#L284>).
> Thus, the PriorityAttributePrioritize only "actively" sorts the first
> 20,000 items on the relationship.  Once the queue is empty, it'll re-sort
> the rest and move the top 20,000 over.  Then the problem repeats.
> 
>    I hadn't read anywhere in the documentation that this is the case.
> Even in the admin guide definition of the nifi.queue.swap.threshold it
> doesn't mention that it affects the PriorityQueue system.  We just raised
> it from 20,000 to 200,000 but we're all assuming that's going to have
> detrimental effects elsewhere.
> 
>   If you guys could confirm our suspicions here, we'd appreciate that.
> Also any suggestions on what to do here, or how an increased threshold size
> negatively impacts the JVM.
> 
> 
> Stack we followed to figure this out:
> 
> SwappablePriorityQueue.java: doPoll()
> SwappablePriorityQueue.java: poll()
> StandardFlowFileQueue.java: queue.poll()
> StandardConnection.java flowFileQueue.poll()
> StandardProcessSession.java: conn.poll()
> InvokeHttp.java: session.get()
> 
> Thanks,
> Ryan


Re: NiFi PriorityAttributePrioritizer doesn't prioritize all items in a queue.

Posted by Ryan Hendrickson <ry...@gmail.com>.
Hi Mark,
   Thanks for the confirmation.

   For the documentation, I'd ask that you consider adding something to the
"Configure Connection -> Available Prioritizer" Help Icon.. Right now it
says "Available prioritizers that could reprioritize FlowFiles in this work
queue."  And the Selected Prioritizers says: "Prioritizers that have been
selected to prioritize FlowFiles in this work queue."  Something to notify
users of the behavior and NiFi attribute that controls it would be helpful.

     Do you know if the default size of 20,000 is specific to the default
JVM size of 512MB in the bootstrap.conf?  Our JVMs are all set to 32GB.  If
we increase the JVM to something much larger, chosen for?  Would you think
that's a linear relationship, or more based on how many total connections
on the entire canvas?

   For a long term improvement..  What about providing an override?..
Similar to the box for "Size Threshold" and "Back Pressure Object
Threshold", could there be one with "Priority Queue Size", set to default
to 20,000, but could be modified on a connection-by-connection basis?
That'd help mitigate what I'm afraid is going to happen when we jacked the
size from 20,000 to 200,000 for the entire server instead of the single
queue.

Thanks again,
Ryan

On Thu, Jun 25, 2020 at 12:05 PM Mark Payne <ma...@hotmail.com> wrote:

> Ryan,
>
> Yes, your analysis is correct. NiFi will prioritize things in the Active
> Queue. Once swapping starts happening, it can’t constantly rebalance all of
> the FlowFiles, as doing so would mean constantly re-reading the swap data
> from disk and re-writing all of the swap data, which would be prohibitively
> expensive.
>
> I went to find the section of the User Guide that explains this, but
> apparently this isn’t explained in the user guide like I thought it was :(
> Sorry about that. I created a Jira [1] to explain that.
>
> So yes, if you increase the swap threshold, that means that you’ll have a
> larger active queue and as a result you’ll be able to handle the
> prioritization better. But it means that you’ll be holding more FlowFiles
> in memory and as a result can encounter OutOfMemoryError easily if your
> heap is not large enough.
>
> Thanks
> -Mark
>
> [1] https://issues.apache.org/jira/browse/NIFI-7583
>
>
> On Jun 24, 2020, at 2:01 PM, Ryan Hendrickson <
> ryan.andrew.hendrickson@gmail.com<ma...@gmail.com>>
> wrote:
>
> Hello,
>   We've noticed that the PriorityAttributePrioritizer doesn't prioritize
> all items in a queue.
>
>   We'll get a queue of 50,000 items in it, then a 50,001 item will be
> added with a #1 priority.  This item won't be prioritized above lower
> priority items in the queue.
>
>   We did some research into this and wanted to confirm what we found...
>
>    For any Relationship there are 2 underlying queues:
>       (1) An Active Queue (java.util.PriorityQueue) for the first 20,000
> items, defined by the nifi.queue.swap.threshold in nifi.properties.
>       (2) A Swap Queue (java.util.ArrayList) for the rest of the
> queue's items.
>
>    If the Active Queue is full, every new item, regardless of priority, is
> placed on the Swap Queue.  No item on the Swap Queue will be re-prioritized
> until the entire Active Queue is empty (Line 284
> <
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java#L284
> >).
> Thus, the PriorityAttributePrioritize only "actively" sorts the first
> 20,000 items on the relationship.  Once the queue is empty, it'll re-sort
> the rest and move the top 20,000 over.  Then the problem repeats.
>
>    I hadn't read anywhere in the documentation that this is the case.
> Even in the admin guide definition of the nifi.queue.swap.threshold it
> doesn't mention that it affects the PriorityQueue system.  We just raised
> it from 20,000 to 200,000 but we're all assuming that's going to have
> detrimental effects elsewhere.
>
>   If you guys could confirm our suspicions here, we'd appreciate that.
> Also any suggestions on what to do here, or how an increased threshold size
> negatively impacts the JVM.
>
>
> Stack we followed to figure this out:
>
> SwappablePriorityQueue.java: doPoll()
> SwappablePriorityQueue.java: poll()
> StandardFlowFileQueue.java: queue.poll()
> StandardConnection.java flowFileQueue.poll()
> StandardProcessSession.java: conn.poll()
> InvokeHttp.java: session.get()
>
> Thanks,
> Ryan
>
>

Re: NiFi PriorityAttributePrioritizer doesn't prioritize all items in a queue.

Posted by Mark Payne <ma...@hotmail.com>.
Ryan,

Yes, your analysis is correct. NiFi will prioritize things in the Active Queue. Once swapping starts happening, it can’t constantly rebalance all of the FlowFiles, as doing so would mean constantly re-reading the swap data from disk and re-writing all of the swap data, which would be prohibitively expensive.

I went to find the section of the User Guide that explains this, but apparently this isn’t explained in the user guide like I thought it was :( Sorry about that. I created a Jira [1] to explain that.

So yes, if you increase the swap threshold, that means that you’ll have a larger active queue and as a result you’ll be able to handle the prioritization better. But it means that you’ll be holding more FlowFiles in memory and as a result can encounter OutOfMemoryError easily if your heap is not large enough.

Thanks
-Mark

[1] https://issues.apache.org/jira/browse/NIFI-7583


On Jun 24, 2020, at 2:01 PM, Ryan Hendrickson <ry...@gmail.com>> wrote:

Hello,
  We've noticed that the PriorityAttributePrioritizer doesn't prioritize
all items in a queue.

  We'll get a queue of 50,000 items in it, then a 50,001 item will be
added with a #1 priority.  This item won't be prioritized above lower
priority items in the queue.

  We did some research into this and wanted to confirm what we found...

   For any Relationship there are 2 underlying queues:
      (1) An Active Queue (java.util.PriorityQueue) for the first 20,000
items, defined by the nifi.queue.swap.threshold in nifi.properties.
      (2) A Swap Queue (java.util.ArrayList) for the rest of the
queue's items.

   If the Active Queue is full, every new item, regardless of priority, is
placed on the Swap Queue.  No item on the Swap Queue will be re-prioritized
until the entire Active Queue is empty (Line 284
<https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java#L284>).
Thus, the PriorityAttributePrioritize only "actively" sorts the first
20,000 items on the relationship.  Once the queue is empty, it'll re-sort
the rest and move the top 20,000 over.  Then the problem repeats.

   I hadn't read anywhere in the documentation that this is the case.
Even in the admin guide definition of the nifi.queue.swap.threshold it
doesn't mention that it affects the PriorityQueue system.  We just raised
it from 20,000 to 200,000 but we're all assuming that's going to have
detrimental effects elsewhere.

  If you guys could confirm our suspicions here, we'd appreciate that.
Also any suggestions on what to do here, or how an increased threshold size
negatively impacts the JVM.


Stack we followed to figure this out:

SwappablePriorityQueue.java: doPoll()
SwappablePriorityQueue.java: poll()
StandardFlowFileQueue.java: queue.poll()
StandardConnection.java flowFileQueue.poll()
StandardProcessSession.java: conn.poll()
InvokeHttp.java: session.get()

Thanks,
Ryan