You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "Aldrin Piri (JIRA)" <ji...@apache.org> on 2017/03/06 14:31:32 UTC

[jira] [Resolved] (NIFI-3545) Let M FlowFilews pass through once N signals arrive

     [ https://issues.apache.org/jira/browse/NIFI-3545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Aldrin Piri resolved NIFI-3545.
-------------------------------
       Resolution: Done
    Fix Version/s: 1.2.0

> Let M FlowFilews pass through once N signals arrive
> ---------------------------------------------------
>
>                 Key: NIFI-3545
>                 URL: https://issues.apache.org/jira/browse/NIFI-3545
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Koji Kawamura
>            Assignee: Koji Kawamura
>             Fix For: 1.2.0
>
>
> If Wait processor can:
> "Let M flow files pass through once N notify signals arrived for key K"
> we can support more variety type of use-cases. Currently, it only support 
> "Let 1 flow file pass through once N notify signals arrived for key K"
> h3. How it works? Simulation
> For example, let's say there are 50 incoming flow files at the beginning, f1 to f50.
> N=3, M=100
> It can be read as "Wait processor is allowed to convert 3 signals to get 100 pass tickets."
> 1. There's no signal for K, all flow files are waiting
> 2. Notify sends a signal. K( N=1 ) doesn't meet Wait condition, Wait processor is still waiting
> 3. Notify sends another two signals. Now K( N=3 ) matches Wait condition
> 4. Wait processor starts consuming flow files, f1 to f50, update K( N=3, M=50), where M denotes remaining number of flow files those can go through
> 5. Another 30 flow files arrive, Wait processor consumes f51 to f80, update K( N=0, M=20)
> 6. Another 30 flow files arrive, Wait processor consumes f81 to f100. K is now K( N=0, M=0 ). Since all N and M is used, Wait processor removes K.  f101 to f110 are waiting for signals, same state as #1.
> h4. Alternative path after 6
> 7a. If Notify sends additional signals, then f101 to f110 can go through
> 7b. If Notify doesn't send any more signal, then f101 to f110 will be routed to expired
> h4. Alternative path after 5
> 6a. If Notify sends additional signal at this point, K would be K( N=1, M=20). Wait processor can process 20 flow files because it still has M=20.
> 6b. If Notify sends additional three signals, K would be K(N=3, M=20). Wait processor consumes 20 flow files, and when 21th flow file comes, it immediately convert N to M, meaning consume N(3) to create M(100) pass, then K(N=0, M=100)
> Additionally, we can let user configure M=0. Meaning, Wait can release any number of incoming flow files as long as N meets the condition.
> With this, Notify +1 can behave as if it opens a GATE, and Notify –1 will close it.
> h4. Another possible use-case, 'Limit data flow rate at cluster wide'
> It's more complex than just supporting GATE open/close state. However, if we support M flow files to go through, it can also provide rate limit across cluster.
> Example use case, NiFi A push data via S2S to NiFi B, and want to limit 100 flow files per 5 min.
> On NiFi A:
> Notify part of flow: GenerateFlowFile(5 min, on primary) -> Notify(K, N=+1)
> Wait part of flow: Some ingested data -> Wait(K, N=1, M=100)
> Since Wait/Notify state is managed globally via DistributedCache, we can limit throughput cluster wide.
> If use case requires to limit rate exactly, then they can design Notify part as:
> GenerateFlowFile(5 min, on primary) -> Notify(K, N=0) -> Notify(K, N=+1)
> It avoids N to be added up when there's no traffic.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)