You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "Bryan Bende (JIRA)" <ji...@apache.org> on 2017/01/09 18:54:58 UTC

[jira] [Commented] (NIFI-3216) Add ability to wait for N signals to Wait/Notify processors

    [ https://issues.apache.org/jira/browse/NIFI-3216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15812529#comment-15812529 ] 

Bryan Bende commented on NIFI-3216:
-----------------------------------

I was hoping to work this ticket, but not sure I will end up having time so I wanted to document the design I've been thinking of over the last week...

Originally I was thinking that each signal would be its own entry in the cache, and the wait would get all the keys and determine if the number of keys with the pattern equaled the signal count, and if so then release. The downside to this approach is that it could possibly require a very large number of cache entries. Consider the case were you SplitText on a 1 million row CSV and want to do something with the original CSV after something else has been done to all 1 million rows are processed, that would require 1 million entries in the cache.

A better approach would probably be to use a single key (as it does now) and make the value of that key be a JSON document (or some structured format) that contains the count of signals for that key, as well as a map of metadata attributes. In order to do this we need to modify the DistributedMapCache to support a replace method that takes the key, the expected current value, and the new value to replace with. The cache server will perform the replace if the current value was the expected current value and return true, otherwise the replace won't be performed and false will be returned.

So the DistributedMapCacheClient would have methods like:
{code}
<K, V> boolean replace(K key, V currentValue, V replaceValue, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException;

 boolean isReplaceSupported();
{code}

The isReplaceSupported() can be used for a cache provider to indicate if this operation is not supported in the even that we eventually provide additional back-end caches, and the Wait/Notify processor could use this to perform custom validation to ensure the processor's are only valid if a cache with replacement capabilities is selected.

So lets say the signal key is "my.signal" and we are going to store the value as JSON like:
{code}
{
  "count" : 0,
  "attributes" : {
     "attr1" : "val1",
     "attr2" : "val2"
  }
}
{code}

When a signal hits the Notify processor it would try retrieve the value of "my.signal" from the cache. If nothing is found then it creates the first version of the JSON with count of 1 and any attributes (based on the regex) and stores it in the cache. If an entry was found then it would increment the count in the JSON and add in any attributes then call replace with the original JSON and the new JSON. If the replace fails it is because something else could have updated the cache between the original retrieval and the replace, so it would repeat the process (get the current value again and attempt replace).

One other thing to consider is the uniqueness of attributes (i.e. if there are 1000 signals and they all have attribute 'foo'). I am proposing that when using these processors in this multi-signal mode we keep it simple and just merge together all the attributes and you would end up with the value of the attribute from the last signal.

> Add ability to wait for N signals to Wait/Notify processors
> -----------------------------------------------------------
>
>                 Key: NIFI-3216
>                 URL: https://issues.apache.org/jira/browse/NIFI-3216
>             Project: Apache NiFi
>          Issue Type: Improvement
>    Affects Versions: 1.2.0
>            Reporter: Bryan Bende
>
> The recently added Wait and Notify processors allow a flow file to be held at the Wait processor until a signal is received in the Notify processor. It would be nice to be able to wait for N signals before releasing.
> One way this could be done is to have a property like "Signal Count" on the Wait processor, and then count the keys in the cache starting with some pattern, and release when the # of keys equals the signal count.
> This would require the ability to get all the keys from the cache, or at least get all keys matching a pattern: 
> https://issues.apache.org/jira/browse/NIFI-3214



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)