You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "Ralph Broenink (Jira)" <ji...@apache.org> on 2022/08/24 07:01:00 UTC

[jira] [Updated] (NIFI-10391) Add option to Load Balance to 'emptiest/fastest node'

     [ https://issues.apache.org/jira/browse/NIFI-10391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ralph Broenink updated NIFI-10391:
----------------------------------
    Description: 
Currently, the connection settings allow for a load balancing option. This load balancing option can be used to distribute FlowFiles to different nodes based on some basic criteria (round robin, single node, or by attribute). Reading from [the design document|https://cwiki.apache.org/confluence/display/NIFI/Load-Balanced+Connections], the considered use case was a ListSFTP processor, that runs on the primary node, and collects many files that can be processed by many different nodes.

Although the load balancing works for a varity of use cases, the use case I run into is a bit different: I have many files, that expand to even more files. Think of zip files that are extracted, or records from large ND-JSON files.

These processors typically run on any node, so any node will have multiple files. However, if you are to use the round robin strategy, you may run into two distinct issues.

 

*Other nodes may have an equal amount of files to process*
In this scenario, the most obvious flaw is that many different nodes may have similar files to work with extracted from similar but different files. In this case, the round robin strategy does not work as intended, as all files will be distributed evenly, though the same amount of files may already be at different nodes. Say in a three-node cluster, each node has 600 files. They will each 'keep' 200 files for themselves and distribute the other 400 to the other nodes, resulting in each having 600 files again. This is completely pointless.

*Distributing files takes time*
I have seen several instances where the network and disk I/O time needed for distributing the load, takes more time than for the 'owning' node to process data. This leaves many files in the connection being distributed to other nodes, whereas keeping them on the 'owning' node would have meant them to have already been processed.

 

I acknowledge that you could argue that 'not load balancing at all' may be an acceptable solution for both issues, However, nodes may be working on other tasks, or one node may have had an exceptionally large compressed file to work with, which you would want to be distributed. The DFM cannot know this in advance, as data may flow in at different moments, and data may have different sizes, and load balancing may generally be a 'good' idea.

In my concrete case, I receive a large file that I ingest into a database. The file is split into smaller chunks to make it easier to work with, and these are distributed to other nodes to split the work evenly. This mostly runs once a day and that is fine, however, sometimes the database is broken and backpressure is generated, sometimes causing multiple files to pile up on different nodes. Additionally, not all data files are created equal, and sometimes the file is very small, and sometimes it is quite large.

 

I therefore propose a load balancing strategy that takes current queue sizes on different nodes into account. I do not know the intricacies of the load balancing framework, and cannot propose a concrete implementation, but I would suppose something like (perhaps a combination of) the following may work:
 * Load balance to a remote queue only if they have zero (or other threshold) files.
 * Load balance to a remote queue where queue prediction says that the queue is getting emptier.
 * Load balance to the 'emptiest' remote queue.
 * Load balance only if files that are being distributed are being processed faster than those remaining on the current node

As said before, I'm not able to make a proper proposal, and this does need some testing to find the best balance between possible strategies. I do feel that a feature such as this could mean that the "load balancing strategy everyone picks" changes to the one we cook up here. I'd also be happy to contribute my thoughts on this.

  was:
Currently, the connection settings allow for a load balancing option. This load balancing option can be used to distribute FlowFiles to different nodes based on some basic criteria (round robin, single node, or by attribute). Reading from [the design document|https://cwiki.apache.org/confluence/display/NIFI/Load-Balanced+Connections], the considered use case was a ListSFTP processor, that runs on the primary node, and collects many files that can be processed by many different nodes.

Although the load balancing works for a varity of use cases, the use case I run into is a bit different: I have many files, that expand to even more files. Think of zip files that are extracted, or records from large ND-JSON files.

These processors typically run on any node, so any node will have multiple files. However, if you are to use the round robin strategy, you may run into two distinct issues.

 

*Other nodes may have an equal amount of files to process*
In this scenario, the most obvious flaw is that many different nodes may have similar files to work with extracted from similar but different files. In this case, the round robin strategy does not work as intended, as all files will be distributed evenly, though the same amount of files may already be at different nodes. Say in a three-node cluster, each node has 600 files. They will each 'keep' 200 files for themselves and distribute the other 400 to the other nodes, resulting in each having 600 files again. This is completely pointless.

*Distributing files takes time*
I have seen several instances where the network and disk I/O time needed for distributing the load, takes more time than for the 'owning' node to process data. This leaves many files in the connection being distributed to other nodes, whereas keeping them on the 'owning' node would have meant them to have already been processed.

 

I acknowledge that you could argue that 'not load balancing at all' may be an acceptable solution for both issues, However, nodes may be working on other tasks, or one node may have had an exceptionally large compressed file to work with, which you would want to be distributed. The DFM cannot know this in advance, as data may flow in at different moments, and data may have different sizes, and load balancing may generally be a 'good' idea.

In my concrete case, I receive a large file that I ingest into a database. The file is split into smaller chunks to make it easier to work with, and these are distributed to other nodes to split the work evenly. This mostly runs once a day and that is fine, however, sometimes the database is broken and backpressure is generated, sometimes causing multiple files to pile up on different nodes. Additionally, not all data files are created equal, and sometimes the file is very small, and sometimes it is quite large.

 

I therefore propose a load balancing strategy that takes current queue sizes on different nodes into account. I do not know the intricacies of the load balancing framework, and cannot propose a concrete implementation, but I would suppose something like (perhaps a combination of) the following may work:
 * Load balance to a remote queue only if they have zero (or other threshold) files.
 * Load balance to a remote queue where queue prediction says that the queue is getting emptier.
 * Load balance to the 'emptiest' remote queue.
 * Load balance only if files that are being distributed are being processed faster than those remaining on the current node

As said before, I'm not able to make a proper proposal, and this does need some testing to find the best balance between possible strategies. I do feel that a feature such as this could mean that the "load balancing strategy everyone picks" changes to the one we cook up here.


> Add option to Load Balance to 'emptiest/fastest node'
> -----------------------------------------------------
>
>                 Key: NIFI-10391
>                 URL: https://issues.apache.org/jira/browse/NIFI-10391
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>            Reporter: Ralph Broenink
>            Priority: Major
>
> Currently, the connection settings allow for a load balancing option. This load balancing option can be used to distribute FlowFiles to different nodes based on some basic criteria (round robin, single node, or by attribute). Reading from [the design document|https://cwiki.apache.org/confluence/display/NIFI/Load-Balanced+Connections], the considered use case was a ListSFTP processor, that runs on the primary node, and collects many files that can be processed by many different nodes.
> Although the load balancing works for a varity of use cases, the use case I run into is a bit different: I have many files, that expand to even more files. Think of zip files that are extracted, or records from large ND-JSON files.
> These processors typically run on any node, so any node will have multiple files. However, if you are to use the round robin strategy, you may run into two distinct issues.
>  
> *Other nodes may have an equal amount of files to process*
> In this scenario, the most obvious flaw is that many different nodes may have similar files to work with extracted from similar but different files. In this case, the round robin strategy does not work as intended, as all files will be distributed evenly, though the same amount of files may already be at different nodes. Say in a three-node cluster, each node has 600 files. They will each 'keep' 200 files for themselves and distribute the other 400 to the other nodes, resulting in each having 600 files again. This is completely pointless.
> *Distributing files takes time*
> I have seen several instances where the network and disk I/O time needed for distributing the load, takes more time than for the 'owning' node to process data. This leaves many files in the connection being distributed to other nodes, whereas keeping them on the 'owning' node would have meant them to have already been processed.
>  
> I acknowledge that you could argue that 'not load balancing at all' may be an acceptable solution for both issues, However, nodes may be working on other tasks, or one node may have had an exceptionally large compressed file to work with, which you would want to be distributed. The DFM cannot know this in advance, as data may flow in at different moments, and data may have different sizes, and load balancing may generally be a 'good' idea.
> In my concrete case, I receive a large file that I ingest into a database. The file is split into smaller chunks to make it easier to work with, and these are distributed to other nodes to split the work evenly. This mostly runs once a day and that is fine, however, sometimes the database is broken and backpressure is generated, sometimes causing multiple files to pile up on different nodes. Additionally, not all data files are created equal, and sometimes the file is very small, and sometimes it is quite large.
>  
> I therefore propose a load balancing strategy that takes current queue sizes on different nodes into account. I do not know the intricacies of the load balancing framework, and cannot propose a concrete implementation, but I would suppose something like (perhaps a combination of) the following may work:
>  * Load balance to a remote queue only if they have zero (or other threshold) files.
>  * Load balance to a remote queue where queue prediction says that the queue is getting emptier.
>  * Load balance to the 'emptiest' remote queue.
>  * Load balance only if files that are being distributed are being processed faster than those remaining on the current node
> As said before, I'm not able to make a proper proposal, and this does need some testing to find the best balance between possible strategies. I do feel that a feature such as this could mean that the "load balancing strategy everyone picks" changes to the one we cook up here. I'd also be happy to contribute my thoughts on this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)