You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/03/02 15:28:45 UTC

[jira] [Commented] (NIFI-3545) Let M FlowFilews pass through once N signals arrive

    [ https://issues.apache.org/jira/browse/NIFI-3545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15892415#comment-15892415 ] 

ASF GitHub Bot commented on NIFI-3545:
--------------------------------------

GitHub user ijokarumawak opened a pull request:

    https://github.com/apache/nifi/pull/1554

    NIFI-3545: Release M FlowFilews once N signals arrive

    - Support multiplle incoming FlowFiles to Wait processor, up to Wait
      Buffer Count
    - Added Releasable FlowFile Count, which controls how many FlowFiles can
      be released when wait condition is met
    - Added special meaning to Notify delta Zero(0) to clear a signal
      counter back to zero
    
    Thank you for submitting a contribution to Apache NiFi.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [x] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [x] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [x] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [x] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    - [x] Have you written or updated unit tests to verify your changes?
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
    - [x] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [x] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ijokarumawak/nifi nifi-3545

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/1554.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1554
    
----
commit 65e95ce8bffa2714843206f19adb916a088deeb1
Author: Koji Kawamura <ij...@apache.org>
Date:   2017-03-02T13:54:46Z

    NIFI-3545: Release M FlowFilews once N signals arrive
    
    - Support multiplle incoming FlowFiles to Wait processor, up to Wait
      Buffer Count
    - Added Releasable FlowFile Count, which controls how many FlowFiles can
      be released when wait condition is met
    - Added special meaning to Notify delta Zero(0) to clear a signal
      counter back to zero

----


> Let M FlowFilews pass through once N signals arrive
> ---------------------------------------------------
>
>                 Key: NIFI-3545
>                 URL: https://issues.apache.org/jira/browse/NIFI-3545
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Koji Kawamura
>            Assignee: Koji Kawamura
>
> If Wait processor can:
> "Let M flow files pass through once N notify signals arrived for key K"
> we can support more variety type of use-cases. Currently, it only support 
> "Let 1 flow file pass through once N notify signals arrived for key K"
> h3. How it works? Simulation
> For example, let's say there are 50 incoming flow files at the beginning, f1 to f50.
> N=3, M=100
> It can be read as "Wait processor is allowed to convert 3 signals to get 100 pass tickets."
> 1. There's no signal for K, all flow files are waiting
> 2. Notify sends a signal. K( N=1 ) doesn't meet Wait condition, Wait processor is still waiting
> 3. Notify sends another two signals. Now K( N=3 ) matches Wait condition
> 4. Wait processor starts consuming flow files, f1 to f50, update K( N=3, M=50), where M denotes remaining number of flow files those can go through
> 5. Another 30 flow files arrive, Wait processor consumes f51 to f80, update K( N=0, M=20)
> 6. Another 30 flow files arrive, Wait processor consumes f81 to f100. K is now K( N=0, M=0 ). Since all N and M is used, Wait processor removes K.  f101 to f110 are waiting for signals, same state as #1.
> h4. Alternative path after 6
> 7a. If Notify sends additional signals, then f101 to f110 can go through
> 7b. If Notify doesn't send any more signal, then f101 to f110 will be routed to expired
> h4. Alternative path after 5
> 6a. If Notify sends additional signal at this point, K would be K( N=1, M=20). Wait processor can process 20 flow files because it still has M=20.
> 6b. If Notify sends additional three signals, K would be K(N=3, M=20). Wait processor consumes 20 flow files, and when 21th flow file comes, it immediately convert N to M, meaning consume N(3) to create M(100) pass, then K(N=0, M=100)
> Additionally, we can let user configure M=0. Meaning, Wait can release any number of incoming flow files as long as N meets the condition.
> With this, Notify +1 can behave as if it opens a GATE, and Notify –1 will close it.
> h4. Another possible use-case, 'Limit data flow rate at cluster wide'
> It's more complex than just supporting GATE open/close state. However, if we support M flow files to go through, it can also provide rate limit across cluster.
> Example use case, NiFi A push data via S2S to NiFi B, and want to limit 100 flow files per 5 min.
> On NiFi A:
> Notify part of flow: GenerateFlowFile(5 min, on primary) -> Notify(K, N=+1)
> Wait part of flow: Some ingested data -> Wait(K, N=1, M=100)
> Since Wait/Notify state is managed globally via DistributedCache, we can limit throughput cluster wide.
> If use case requires to limit rate exactly, then they can design Notify part as:
> GenerateFlowFile(5 min, on primary) -> Notify(K, N=0) -> Notify(K, N=+1)
> It avoids N to be added up when there's no traffic.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)