You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "Mark Payne (JIRA)" <ji...@apache.org> on 2018/05/31 18:35:00 UTC

[jira] [Commented] (NIFI-4026) SiteToSite Partitioning

    [ https://issues.apache.org/jira/browse/NIFI-4026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16496981#comment-16496981 ] 

Mark Payne commented on NIFI-4026:
----------------------------------

[~pvillard] thanks for bringing this up - it's a nice feature to have. I do think we need to be careful about a few things, though:

I would not try to allow for any sort of scripting language to be used for a hashing function. Instead, we should just do something like `attributeOfInterest.hashCode() % numberOfNodes`. If scripting is needed, users could use ExecuteScript or something similar to generate the value to use for that 'attribute of interest'. We certainly do not want to add any dependencies into the framework for scripting languages, and even the user experience around providing that as an option, I think, complicates things more than it needs to.

I do not know that this would necessarily result in a batch of 1 - we could certainly create multiple transactions for multiple destination nodes.

Regarding node scaling: I am okay, at least initially, saying that scaling up or down may result in 'rebalancing' the calculation of which node belongs to which hash values, as long as this is well documented. However, a case that I do think is important to solve and cannot be ignored is the case of nodes connecting and disconnecting. This occurs much more frequently than nodes being added to / removed from a cluster. To handle this properly, I think the site-to-site protocol would have to be updated so that when a client asks for the nodes in a cluster, the response would need to include all nodes, regardless of their Connection Status, but would also need to include the nodes' Connection Statuses so that the client doesn't try to send data to a disconnected node. But by including Connecting, Disconnecting, and Disconnected nodes, we can avoid the case of sending data to the wrong destination just because a node in the destination cluster temporarily disconnected.

Additionally, I think it should also be a requirement that in order to do this, when a node is notified of a change to the cluster topology, instead of just updating an in-memory representation of the cluster topology, the node will need to persist this information. This is needed to address the case where the destination cluster is restarted. If the destination cluster has, say 10 nodes, and they are restarted, then a client may request the cluster topology and find that there are only 2 nodes in the cluster. Currently, that's okay, it will send data to those 2 nodes. And a minute or so later realize there are now 6 nodes. And a minute later, 8 nodes, and finally 10 nodes. However, this will cause a lot of problems for a use case where we actually need to know the size of the whole cluster. So if we persist this information on the nodes, then even upon restart we can know how many nodes to expect in the cluster.

Finally, one implementation detail that I think is worth noting here - once we know the nodes in the cluster, it's important that we sort those nodes by their Node Identifiers, before trying to hash an attribute to determine which node to send the data to. This will ensure that even if the client is restarted, we are able to tie a hash value to the correct node (as opposed to just using a List<Node> that could be populated in a different order each time that we determine the nodes in a cluster).

> SiteToSite Partitioning
> -----------------------
>
>                 Key: NIFI-4026
>                 URL: https://issues.apache.org/jira/browse/NIFI-4026
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>            Reporter: Pierre Villard
>            Priority: Major
>
> To answer some use cases and to always provide more flexibility to the Site-to-Site mechanism it would be interesting to introduce a S2S Partitioning Key.
> The idea would be to add a parameter in the S2S configuration to compute the destination node based on the attribute of a flow file. The user would set the attribute to read from the incoming flow files and a hashing function would be applied on this attribute value to get a number between 1 and N (N being the number of nodes on the remote cluster) to select the destination node.
> It could even be possible to let the user code a custom hashing function in a scripting language.
> This approach would potentially force the “batching” to 1, or it could be necessary to create bins to batch together flow files that are supposed to go to the same node.
> Obviously, it comes the question regarding how to handle cluster scale up/down. However, I believe this is an edge case and should not be blocking this feature.
> Some of the use cases could be:
> - better load balancing of the flow files when using the List/Fetch pattern (example: ListHDFS/FetchHDFS and load balance based on the size of the remote file to fetch)
> - being able to keep on the same node the data related to the same element (based on business requirements, example: all the logs from a given host should be merged in the same file and not have one file per NiFi node)
> - give the possibility to send all the data back to the primary node (we could say that if the hash function returns 0, then the destination node is the primary node) in case this is required for specific operations. This would avoid the need to do the full workflow on the primary node only when some parts can be load balanced.
> I also think that this work would be a good foundation for the "node labeling" stuff that has been discussed on the mailing lists in the past.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)